Skip to content

Commit a03d54e

Browse files
committed
Fix SNS v2 batch injection bug
1 parent f5bcaf7 commit a03d54e

1 file changed

Lines changed: 28 additions & 30 deletions

File tree

  • dd-java-agent/instrumentation/aws-java/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns

dd-java-agent/instrumentation/aws-java/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/SnsInterceptor.java

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package datadog.trace.instrumentation.aws.v2.sns;
22

33
import static datadog.context.propagation.Propagators.defaultPropagator;
4-
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
5-
import static datadog.trace.api.datastreams.DataStreamsTags.create;
64
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
7-
import static datadog.trace.instrumentation.aws.v2.sns.TextMapInjectAdapter.SETTER;
85

96
import datadog.context.Context;
107
import datadog.trace.api.Config;
@@ -28,6 +25,9 @@
2825

2926
public class SnsInterceptor implements ExecutionInterceptor {
3027

28+
// SQS subscriber limit; SNS inherits it when SQS is used as a subscriber
29+
private static final int MAX_MESSAGE_ATTRIBUTES = 10;
30+
3131
public static final ExecutionAttribute<Context> CONTEXT_ATTRIBUTE =
3232
InstanceStore.of(ExecutionAttribute.class)
3333
.putIfAbsent("DatadogContext", () -> new ExecutionAttribute<>("DatadogContext"));
@@ -38,10 +38,12 @@ private SdkBytes getMessageAttributeValueToInject(
3838
StringBuilder jsonBuilder = new StringBuilder();
3939
jsonBuilder.append('{');
4040
if (traceConfig().isDataStreamsEnabled()) {
41-
DataStreamsContext dsmContext = DataStreamsContext.fromTags(getTags(snsTopicName));
41+
DataStreamsTags tags =
42+
DataStreamsTags.create("sns", DataStreamsTags.Direction.OUTBOUND, snsTopicName);
43+
DataStreamsContext dsmContext = DataStreamsContext.fromTags(tags);
4244
context = context.with(dsmContext);
4345
}
44-
defaultPropagator().inject(context, jsonBuilder, SETTER);
46+
defaultPropagator().inject(context, jsonBuilder, TextMapInjectAdapter.SETTER);
4547
jsonBuilder.setLength(jsonBuilder.length() - 1); // Remove the last comma
4648
jsonBuilder.append('}');
4749
return SdkBytes.fromString(jsonBuilder.toString(), StandardCharsets.UTF_8);
@@ -57,9 +59,7 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu
5759
// Injecting the trace context into SNS messageAttributes.
5860
if (context.request() instanceof PublishRequest) {
5961
PublishRequest request = (PublishRequest) context.request();
60-
// 10 messageAttributes is a limit from SQS, which is often used as a subscriber, therefore
61-
// the limit still applies here
62-
if (request.messageAttributes().size() < 10) {
62+
if (request.messageAttributes().size() < MAX_MESSAGE_ATTRIBUTES) {
6363
// Get topic name for DSM
6464
String snsTopicArn = request.topicArn();
6565
if (null == snsTopicArn) {
@@ -70,17 +70,11 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu
7070
}
7171

7272
String snsTopicName = snsTopicArn.substring(snsTopicArn.lastIndexOf(':') + 1);
73-
Map<String, MessageAttributeValue> modifiedMessageAttributes =
74-
new HashMap<>(request.messageAttributes());
75-
modifiedMessageAttributes.put(
76-
"_datadog", // Use Binary since SNS subscription filter policies fail silently with JSON
77-
// strings https://github.com/DataDog/datadog-lambda-js/pull/269
78-
MessageAttributeValue.builder()
79-
.dataType("Binary")
80-
.binaryValue(
81-
this.getMessageAttributeValueToInject(executionAttributes, snsTopicName))
82-
.build());
83-
return request.toBuilder().messageAttributes(modifiedMessageAttributes).build();
73+
Map<String, MessageAttributeValue> messageAttributes =
74+
withDatadogAttribute(
75+
request.messageAttributes(),
76+
this.getMessageAttributeValueToInject(executionAttributes, snsTopicName));
77+
return request.toBuilder().messageAttributes(messageAttributes).build();
8478
}
8579
return request;
8680
} else if (context.request() instanceof PublishBatchRequest) {
@@ -89,24 +83,28 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu
8983
String snsTopicArn = request.topicArn();
9084
String snsTopicName = snsTopicArn.substring(snsTopicArn.lastIndexOf(':') + 1);
9185
ArrayList<PublishBatchRequestEntry> entries = new ArrayList<>();
92-
final SdkBytes sdkBytes =
93-
this.getMessageAttributeValueToInject(executionAttributes, snsTopicName);
86+
SdkBytes value = this.getMessageAttributeValueToInject(executionAttributes, snsTopicName);
9487
for (PublishBatchRequestEntry entry : request.publishBatchRequestEntries()) {
95-
if (entry.messageAttributes().size() < 10) {
96-
Map<String, MessageAttributeValue> modifiedMessageAttributes =
97-
new HashMap<>(entry.messageAttributes());
98-
modifiedMessageAttributes.put(
99-
"_datadog",
100-
MessageAttributeValue.builder().dataType("Binary").binaryValue(sdkBytes).build());
101-
entries.add(entry.toBuilder().messageAttributes(modifiedMessageAttributes).build());
88+
if (entry.messageAttributes().size() < MAX_MESSAGE_ATTRIBUTES) {
89+
Map<String, MessageAttributeValue> messageAttributes =
90+
withDatadogAttribute(entry.messageAttributes(), value);
91+
entry = entry.toBuilder().messageAttributes(messageAttributes).build();
10292
}
93+
entries.add(entry);
10394
}
10495
return request.toBuilder().publishBatchRequestEntries(entries).build();
10596
}
10697
return context.request();
10798
}
10899

109-
private DataStreamsTags getTags(String snsTopicName) {
110-
return create("sns", OUTBOUND, snsTopicName);
100+
private static Map<String, MessageAttributeValue> withDatadogAttribute(
101+
Map<String, MessageAttributeValue> attributes, SdkBytes value) {
102+
// copy since the original map may be unmodifiable
103+
Map<String, MessageAttributeValue> modified = new HashMap<>(attributes);
104+
// Use Binary since SNS subscription filter policies fail silently with JSON strings
105+
// https://github.com/DataDog/datadog-lambda-js/pull/269
106+
modified.put(
107+
"_datadog", MessageAttributeValue.builder().dataType("Binary").binaryValue(value).build());
108+
return modified;
111109
}
112110
}

0 commit comments

Comments
 (0)