-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathDefaultSNSBatchPublisher.java
More file actions
189 lines (171 loc) · 9.97 KB
/
DefaultSNSBatchPublisher.java
File metadata and controls
189 lines (171 loc) · 9.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package com.here.xyz.pub.impl;
import com.here.xyz.models.hub.Subscription;
import com.here.xyz.pub.mapper.IPubMsgMapper;
import com.here.xyz.pub.models.PubConfig;
import com.here.xyz.pub.models.PubLogConstants;
import com.here.xyz.pub.models.PubTransactionData;
import com.here.xyz.pub.models.PublishEntryDTO;
import com.here.xyz.pub.util.AwsUtil;
import com.here.xyz.pub.util.MessageUtil;
import com.here.xyz.pub.util.PubUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.services.sns.SnsAsyncClient;
import software.amazon.awssdk.services.sns.model.*;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
public class DefaultSNSBatchPublisher implements IPublisher {
private static final Logger logger = LogManager.getLogger();
// Convert and publish transactions to desired SNS Topic
@Override
public void publishTransactions(final PubConfig pubCfg, final Subscription sub,
final List<PubTransactionData> txnList,
final PublishEntryDTO pubDTO) throws Exception {
final String subId = sub.getId();
final String spaceId = sub.getSource();
final String snsTopic = PubUtil.getSnsTopicARN(sub);
final long lotStartTS = System.currentTimeMillis();
// local counters
int publishedRecCnt = 0;
// Variables for batch publish
final int TXN_LIST_SIZE = txnList.size();
final int MAX_ALLOWED_BATCH_SIZE = 10;
final long MAX_ALLOWED_PAYLOAD_SIZE = 240*1024; // keeping some buffer below 256K
final String MSG_ID_PREFIX = "msg_idx_";
final List<PublishBatchRequestEntry> batchEntries = new ArrayList<>();
long aggrBatchPayloadSize = 0; // aggregated batch payload size
int txnRecCnt = 0;
int batchEntryCounter = 0;
try {
final IPubMsgMapper msgMapper = MessageUtil.getMsgMapperInstance(sub);
// TODO : Support multi-region based on subscription configuration.
// We may require region specific publisher job for the respective subscriptions.
final SnsAsyncClient snsClient = AwsUtil.getSnsAsyncClient(pubCfg.AWS_DEFAULT_REGION);
// Publish all transactions on SNS Topic (in the same order they were fetched)
for (final PubTransactionData txnData : txnList) {
txnRecCnt++;
// Convert transaction payload into expected publishable format
final String pubFormat = msgMapper.mapToPublishableFormat(sub, txnData);
// Prepare SNS Notification message
final String msg = MessageUtil.compressAndEncodeToString(pubFormat);
final int msgLength = msg.length();
final Map<String, MessageAttributeValue> msgAttrMap = populateMessageAttributeMap(txnData, sub, spaceId);
// publish accumulated batch if payload limit reached
if (msgLength+aggrBatchPayloadSize > MAX_ALLOWED_PAYLOAD_SIZE && !batchEntries.isEmpty()) {
// publish current batch
publishBatchEntriesAndCheckResult(batchEntries, snsTopic, snsClient, txnList, subId, publishedRecCnt, MSG_ID_PREFIX, pubCfg, pubDTO);
// update batch variables
publishedRecCnt += batchEntries.size();
batchEntries.clear();
aggrBatchPayloadSize = 0;
batchEntryCounter = 0;
}
// Prepare PublishBatchEntry for current message
final PublishBatchRequestEntry batchEntry = PublishBatchRequestEntry.builder()
.message(msg)
.messageAttributes(msgAttrMap)
.id(MSG_ID_PREFIX + batchEntryCounter)
.build();
// add current message to the batch
batchEntries.add(batchEntry);
aggrBatchPayloadSize += msgLength;
batchEntryCounter++;
// publish batch if count limit reached
if ( (batchEntries.size() >= MAX_ALLOWED_BATCH_SIZE) // batch size exceeded
|| (txnRecCnt >= TXN_LIST_SIZE) // this is last record in the list
) {
// publish current batch
publishBatchEntriesAndCheckResult(batchEntries, snsTopic, snsClient, txnList, subId, publishedRecCnt, MSG_ID_PREFIX, pubCfg, pubDTO);
// update batch variables
publishedRecCnt += batchEntries.size();
batchEntries.clear();
aggrBatchPayloadSize = 0;
batchEntryCounter = 0;
}
}
}
finally {
final long lotTimeTaken = System.currentTimeMillis() - lotStartTS;
logger.info("Transaction publish stats for SNS [{}] [format => eventType,subId,spaceId,msgCount,timeTakenMs,lastTxnId,lastTxnRecId] - {} {} {} {} {} {} {}",
snsTopic, PubLogConstants.LOG_EVT_TXN_PUBLISH_STATS, subId, spaceId, publishedRecCnt, lotTimeTaken, pubDTO.getLastTxnId(), pubDTO.getLastTxnRecId());
}
}
private void publishBatchEntriesAndCheckResult(final List<PublishBatchRequestEntry> batchEntries,
final String snsTopic, final SnsAsyncClient snsClient,
final List<PubTransactionData> txnList, final String subId,
final int publishedRecCnt, final String MSG_ID_PREFIX,
final PubConfig pubCfg, final PublishEntryDTO pubDTO) throws Exception {
// Prepare batch request
final PublishBatchRequest batchRequest = PublishBatchRequest.builder()
.topicArn(snsTopic)
.publishBatchRequestEntries(batchEntries)
.build();
// Publish the batch request
final CompletableFuture<PublishBatchResponse> futureResponse = snsClient.publishBatch(batchRequest);
final PublishBatchResponse result = futureResponse.join();
for (int i = 0; i < batchEntries.size(); i++) {
final String matchStr = MSG_ID_PREFIX + i;
final int txnRecIdx = publishedRecCnt + i;
final long msgTxnId = txnList.get(txnRecIdx).getTxnId();
final long msgTxnRecId = txnList.get(txnRecIdx).getTxnRecId();
final String featureId = txnList.get(txnRecIdx).getFeatureId();
final String action = txnList.get(txnRecIdx).getAction();
boolean resultFound = false;
// Find in the list of publish-success entries
if (result.hasSuccessful()) {
for (final PublishBatchResultEntry success : result.successful()) {
if (matchStr.equals(success.id())) {
resultFound = true;
// Record last successfully published transaction Id's
pubDTO.setLastTxnId(msgTxnId);
pubDTO.setLastTxnRecId(msgTxnRecId);
if (pubCfg.ENABLE_TXN_PUB_DETAILED_LOGGING) {
logger.info("Message no. [{}], txnId={}, txnRecId={}, featureId={}, action={}, published to SNS [{}] for subId [{}].",
txnRecIdx+1, msgTxnId, msgTxnRecId, featureId, action, snsTopic, subId);
}
break;
}
}
}
// Find in the list of publish-failed entries
if (!resultFound && result.hasFailed()) {
for (final BatchResultErrorEntry error : result.failed()) {
if (matchStr.equals(error.id())) {
resultFound = true;
if (pubCfg.ENABLE_TXN_PUB_DETAILED_LOGGING) {
logger.info("Message no. [{}], txnId={}, txnRecId={}, featureId={}, action={}, failed while publishing to SNS [{}] for subId [{}].",
txnRecIdx + 1, msgTxnId, msgTxnRecId, featureId, action, snsTopic, subId);
}
// Raise error here to stop publishing
throw new Exception("Message no. ["+txnRecIdx+1+"], txnId="+msgTxnId+", " +
"txnRecId="+msgTxnRecId+", failed while publishing to SNS ["+snsTopic+"] for subId ["+subId+"]");
}
}
}
if (!resultFound) {
// Neither found in success list, nor in failed list
if (pubCfg.ENABLE_TXN_PUB_DETAILED_LOGGING) {
logger.info("No result found for Message no. [{}], txnId={}, txnRecId={}, featureId={}, action={}, while publishing to SNS [{}] for subId [{}].",
txnRecIdx + 1, msgTxnId, msgTxnRecId, featureId, action, snsTopic, subId);
}
// Raise error here to stop publishing
throw new Exception("No result found for Message no. ["+txnRecIdx+1+"], txnId="+msgTxnId+", " +
"txnRecId="+msgTxnRecId+", while publishing to SNS ["+snsTopic+"] for subId ["+subId+"]");
}
}
}
private Map<String, MessageAttributeValue> populateMessageAttributeMap(
final PubTransactionData txnData, final Subscription sub, final String spaceId) {
final Map<String, MessageAttributeValue> msgAttrMap = new HashMap<>();
MessageUtil.addToAttributeMap(msgAttrMap, "action", txnData.getAction());
MessageUtil.addToAttributeMap(msgAttrMap, "space", spaceId);
MessageUtil.addToAttributeMap(msgAttrMap, "featureId", txnData.getFeatureId());
// Add other custom attributes
MessageUtil.addCustomFieldsToAttributeMap(msgAttrMap, sub, txnData.getJsonData());
return msgAttrMap;
}
}