11package datadog .trace .instrumentation .aws .v2 .sns ;
22
33import static datadog .context .propagation .Propagators .defaultPropagator ;
4- import static datadog .trace .api .datastreams .DataStreamsTags .Direction .OUTBOUND ;
5- import static datadog .trace .api .datastreams .DataStreamsTags .create ;
64import static datadog .trace .bootstrap .instrumentation .api .AgentTracer .traceConfig ;
7- import static datadog .trace .instrumentation .aws .v2 .sns .TextMapInjectAdapter .SETTER ;
85
96import datadog .context .Context ;
107import datadog .trace .api .Config ;
2825
2926public class SnsInterceptor implements ExecutionInterceptor {
3027
28+ // SQS subscriber limit; SNS inherits it when SQS is used as a subscriber
29+ private static final int MAX_MESSAGE_ATTRIBUTES = 10 ;
30+
3131 public static final ExecutionAttribute <Context > CONTEXT_ATTRIBUTE =
3232 InstanceStore .of (ExecutionAttribute .class )
3333 .putIfAbsent ("DatadogContext" , () -> new ExecutionAttribute <>("DatadogContext" ));
@@ -38,10 +38,12 @@ private SdkBytes getMessageAttributeValueToInject(
3838 StringBuilder jsonBuilder = new StringBuilder ();
3939 jsonBuilder .append ('{' );
4040 if (traceConfig ().isDataStreamsEnabled ()) {
41- DataStreamsContext dsmContext = DataStreamsContext .fromTags (getTags (snsTopicName ));
41+ DataStreamsTags tags =
42+ DataStreamsTags .create ("sns" , DataStreamsTags .Direction .OUTBOUND , snsTopicName );
43+ DataStreamsContext dsmContext = DataStreamsContext .fromTags (tags );
4244 context = context .with (dsmContext );
4345 }
44- defaultPropagator ().inject (context , jsonBuilder , SETTER );
46+ defaultPropagator ().inject (context , jsonBuilder , TextMapInjectAdapter . SETTER );
4547 jsonBuilder .setLength (jsonBuilder .length () - 1 ); // Remove the last comma
4648 jsonBuilder .append ('}' );
4749 return SdkBytes .fromString (jsonBuilder .toString (), StandardCharsets .UTF_8 );
@@ -57,9 +59,7 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu
5759 // Injecting the trace context into SNS messageAttributes.
5860 if (context .request () instanceof PublishRequest ) {
5961 PublishRequest request = (PublishRequest ) context .request ();
60- // 10 messageAttributes is a limit from SQS, which is often used as a subscriber, therefore
61- // the limit still applies here
62- if (request .messageAttributes ().size () < 10 ) {
62+ if (request .messageAttributes ().size () < MAX_MESSAGE_ATTRIBUTES ) {
6363 // Get topic name for DSM
6464 String snsTopicArn = request .topicArn ();
6565 if (null == snsTopicArn ) {
@@ -70,17 +70,11 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu
7070 }
7171
7272 String snsTopicName = snsTopicArn .substring (snsTopicArn .lastIndexOf (':' ) + 1 );
73- Map <String , MessageAttributeValue > modifiedMessageAttributes =
74- new HashMap <>(request .messageAttributes ());
75- modifiedMessageAttributes .put (
76- "_datadog" , // Use Binary since SNS subscription filter policies fail silently with JSON
77- // strings https://github.com/DataDog/datadog-lambda-js/pull/269
78- MessageAttributeValue .builder ()
79- .dataType ("Binary" )
80- .binaryValue (
81- this .getMessageAttributeValueToInject (executionAttributes , snsTopicName ))
82- .build ());
83- return request .toBuilder ().messageAttributes (modifiedMessageAttributes ).build ();
73+ Map <String , MessageAttributeValue > messageAttributes =
74+ withDatadogAttribute (
75+ request .messageAttributes (),
76+ this .getMessageAttributeValueToInject (executionAttributes , snsTopicName ));
77+ return request .toBuilder ().messageAttributes (messageAttributes ).build ();
8478 }
8579 return request ;
8680 } else if (context .request () instanceof PublishBatchRequest ) {
@@ -89,24 +83,30 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu
8983 String snsTopicArn = request .topicArn ();
9084 String snsTopicName = snsTopicArn .substring (snsTopicArn .lastIndexOf (':' ) + 1 );
9185 ArrayList <PublishBatchRequestEntry > entries = new ArrayList <>();
92- final SdkBytes sdkBytes =
93- this .getMessageAttributeValueToInject (executionAttributes , snsTopicName );
9486 for (PublishBatchRequestEntry entry : request .publishBatchRequestEntries ()) {
95- if ( entry . messageAttributes (). size () < 10 ) {
96- Map < String , MessageAttributeValue > modifiedMessageAttributes =
97- new HashMap <>( entry . messageAttributes ());
98- modifiedMessageAttributes . put (
99- "_datadog" ,
100- MessageAttributeValue . builder (). dataType ( "Binary" ). binaryValue ( sdkBytes ). build ( ));
101- entries . add ( entry .toBuilder ().messageAttributes (modifiedMessageAttributes ).build () );
87+ PublishBatchRequestEntry modified = entry ;
88+ if ( entry . messageAttributes (). size () < MAX_MESSAGE_ATTRIBUTES ) {
89+ Map < String , MessageAttributeValue > messageAttributes =
90+ withDatadogAttribute (
91+ entry . messageAttributes () ,
92+ this . getMessageAttributeValueToInject ( executionAttributes , snsTopicName ));
93+ modified = entry .toBuilder ().messageAttributes (messageAttributes ).build ();
10294 }
95+ entries .add (modified );
10396 }
10497 return request .toBuilder ().publishBatchRequestEntries (entries ).build ();
10598 }
10699 return context .request ();
107100 }
108101
109- private DataStreamsTags getTags (String snsTopicName ) {
110- return create ("sns" , OUTBOUND , snsTopicName );
102+ private static Map <String , MessageAttributeValue > withDatadogAttribute (
103+ Map <String , MessageAttributeValue > attributes , SdkBytes value ) {
104+ // copy since the original map may be unmodifiable
105+ Map <String , MessageAttributeValue > modified = new HashMap <>(attributes );
106+ // Use Binary since SNS subscription filter policies fail silently with JSON strings
107+ // https://github.com/DataDog/datadog-lambda-js/pull/269
108+ modified .put (
109+ "_datadog" , MessageAttributeValue .builder ().dataType ("Binary" ).binaryValue (value ).build ());
110+ return modified ;
111111 }
112112}
0 commit comments