Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/reusable-build-and-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
id: save-artifact-name
run: echo "name=${{ env.GITHUB_CODEDEPLOY_ARTIFACT_NAME }}" >> "$GITHUB_OUTPUT"
- name: Save CodeDeploy artifact content
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: ${{ env.GITHUB_CODEDEPLOY_ARTIFACT_NAME }}
path: ${{ env.CODEDEPLOY_DIR }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/reusable-s3-codedeploy-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
# create directory to download artifacts
- run: mkdir download
- name: Fetch CodeDeploy artifacts
uses: actions/download-artifact@v3
uses: actions/download-artifact@v4
with:
name: ${{ inputs.pipeline-artifact-name }}
path: download
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## Naksha_1.1.3

- Increased DB Pool size of Naksha Admin DB from 10 to 25 to allow additional sequencer/publisher jobs to run in parallel.

## Naksha_1.1.2

- Fixed batch-size failure issue for SNS publishing where one of the message fails during publish, then we optimize by continuing from that failure instead of re-processing entire batch
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

<groupId>com.here.xyz</groupId>
<artifactId>xyz-hub</artifactId>
<version>1.1.2</version>
<version>1.1.3</version>
<packaging>pom</packaging>

<modules>
Expand Down
2 changes: 1 addition & 1 deletion xyz-connectors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<groupId>com.here.xyz</groupId>
<artifactId>xyz-hub</artifactId>
<relativePath>../</relativePath>
<version>1.1.2</version>
<version>1.1.3</version>
</parent>

<licenses>
Expand Down
2 changes: 1 addition & 1 deletion xyz-hub-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<groupId>com.here.xyz</groupId>
<artifactId>xyz-hub</artifactId>
<relativePath>../pom.xml</relativePath>
<version>1.1.2</version>
<version>1.1.3</version>
</parent>

<licenses>
Expand Down
2 changes: 1 addition & 1 deletion xyz-hub-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<groupId>com.here.xyz</groupId>
<artifactId>xyz-hub</artifactId>
<relativePath>../</relativePath>
<version>1.1.2</version>
<version>1.1.3</version>
</parent>

<licenses>
Expand Down
2 changes: 1 addition & 1 deletion xyz-models/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<groupId>com.here.xyz</groupId>
<artifactId>xyz-hub</artifactId>
<relativePath>../</relativePath>
<version>1.1.2</version>
<version>1.1.3</version>
</parent>

<licenses>
Expand Down
2 changes: 1 addition & 1 deletion xyz-psql-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<groupId>com.here.xyz</groupId>
<artifactId>xyz-hub</artifactId>
<relativePath>../</relativePath>
<version>1.1.2</version>
<version>1.1.3</version>
</parent>

<licenses>
Expand Down
2 changes: 1 addition & 1 deletion xyz-txn-handler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<groupId>com.here.xyz</groupId>
<artifactId>xyz-hub</artifactId>
<relativePath>../</relativePath>
<version>1.1.2</version>
<version>1.1.3</version>
</parent>

<licenses>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ private static void readConfig() {
adminDBConnParams.setDbUrl(pubCfg.STORAGE_DB_URL);
adminDBConnParams.setUser(pubCfg.STORAGE_DB_USER);
adminDBConnParams.setPswd(pubCfg.STORAGE_DB_PASSWORD);
adminDBConnParams.setMaxPoolSize(25); // keeping higher value to allow parallel seq/publisher jobs
// Set AWS account access details
System.setProperty("aws.accessKeyId", (pubCfg.AWS_ACCESS_KEY_ID!=null) ? pubCfg.AWS_ACCESS_KEY_ID : "");
System.setProperty("aws.secretAccessKey", (pubCfg.AWS_SECRET_ACCESS_KEY!=null) ? pubCfg.AWS_SECRET_ACCESS_KEY : "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,12 @@ public void run() {
return;
}
// Handover transactions to appropriate Publisher (e.g. DefaultSNSPublisher)
PublishEntryDTO lastTxn = null;
final PublishEntryDTO lastTxn = new PublishEntryDTO(lastTxnId, lastTxnRecId);
try {
lastTxn = PubUtil.getPubInstance(sub).publishTransactions(pubCfg, sub, txnList, lastTxnId, lastTxnRecId);
PubUtil.getPubInstance(sub).publishTransactions(pubCfg, sub, txnList, lastTxn);
} finally {
if (lastTxn != null) {
// Update last txn_id in AdminDB::xyz_config::xyz_txn_pub table
PubDatabaseHandler.saveLastTxnId(adminDBConnParams, subId, lastTxn);
}
// Update last txn_id in AdminDB::xyz_config::xyz_txn_pub table
PubDatabaseHandler.saveLastTxnId(adminDBConnParams, subId, lastTxn);
}
} catch (Exception ex) {
logger.error("{} - Exception in publisher job for subId={}, spaceId={}. ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ public class DefaultSNSBatchPublisher implements IPublisher {

// Convert and publish transactions to desired SNS Topic
@Override
public PublishEntryDTO publishTransactions(final PubConfig pubCfg, final Subscription sub,
public void publishTransactions(final PubConfig pubCfg, final Subscription sub,
final List<PubTransactionData> txnList,
final long lastStoredTxnId, final long lastStoredTxnRecId) throws Exception {
final PublishEntryDTO pubDTO) throws Exception {
final String subId = sub.getId();
final String spaceId = sub.getSource();
final String snsTopic = PubUtil.getSnsTopicARN(sub);
final long lotStartTS = System.currentTimeMillis();
// local counters
final PublishEntryDTO pubDTO = new PublishEntryDTO(lastStoredTxnId, lastStoredTxnRecId);
int publishedRecCnt = 0;
// Variables for batch publish
final int TXN_LIST_SIZE = txnList.size();
Expand Down Expand Up @@ -64,15 +63,8 @@ public PublishEntryDTO publishTransactions(final PubConfig pubCfg, final Subscri
final int msgLength = msg.length();
final Map<String, MessageAttributeValue> msgAttrMap = populateMessageAttributeMap(txnData, sub, spaceId);

// Prepare PublishBatchEntry for current message
final PublishBatchRequestEntry batchEntry = PublishBatchRequestEntry.builder()
.message(msg)
.messageAttributes(msgAttrMap)
.id(MSG_ID_PREFIX + batchEntryCounter)
.build();

// publish batch if payload limit reached
if (msgLength+aggrBatchPayloadSize > MAX_ALLOWED_PAYLOAD_SIZE && batchEntries.size()>0) {
// publish accumulated batch if payload limit reached
if (msgLength+aggrBatchPayloadSize > MAX_ALLOWED_PAYLOAD_SIZE && !batchEntries.isEmpty()) {
// publish current batch
publishBatchEntriesAndCheckResult(batchEntries, snsTopic, snsClient, txnList, subId, publishedRecCnt, MSG_ID_PREFIX, pubCfg, pubDTO);
// update batch variables
Expand All @@ -82,6 +74,12 @@ public PublishEntryDTO publishTransactions(final PubConfig pubCfg, final Subscri
batchEntryCounter = 0;
}

// Prepare PublishBatchEntry for current message
final PublishBatchRequestEntry batchEntry = PublishBatchRequestEntry.builder()
.message(msg)
.messageAttributes(msgAttrMap)
.id(MSG_ID_PREFIX + batchEntryCounter)
.build();
// add current message to the batch
batchEntries.add(batchEntry);
aggrBatchPayloadSize += msgLength;
Expand All @@ -106,8 +104,6 @@ public PublishEntryDTO publishTransactions(final PubConfig pubCfg, final Subscri
logger.info("Transaction publish stats for SNS [{}] [format => eventType,subId,spaceId,msgCount,timeTakenMs,lastTxnId,lastTxnRecId] - {} {} {} {} {} {} {}",
snsTopic, PubLogConstants.LOG_EVT_TXN_PUBLISH_STATS, subId, spaceId, publishedRecCnt, lotTimeTaken, pubDTO.getLastTxnId(), pubDTO.getLastTxnRecId());
}

return pubDTO;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@ public class DefaultSNSSinglePublisher implements IPublisher {

// Convert and publish transactions to desired SNS Topic
@Override
public PublishEntryDTO publishTransactions(final PubConfig pubCfg, final Subscription sub,
public void publishTransactions(final PubConfig pubCfg, final Subscription sub,
final List<PubTransactionData> txnList,
final long lastStoredTxnId, final long lastStoredTxnRecId) throws Exception {
final PublishEntryDTO pubDTO) throws Exception {
final String subId = sub.getId();
final String spaceId = sub.getSource();
final String snsTopic = PubUtil.getSnsTopicARN(sub);
final long lotStartTS = System.currentTimeMillis();
// local counters
final PublishEntryDTO pubDTO = new PublishEntryDTO(lastStoredTxnId, lastStoredTxnRecId);
long publishedRecCnt = 0;

try {
Expand Down Expand Up @@ -82,8 +81,6 @@ public PublishEntryDTO publishTransactions(final PubConfig pubCfg, final Subscri
logger.info("Transaction publish stats for SNS [{}] [format => eventType,subId,spaceId,msgCount,timeTakenMs,lastTxnId,lastTxnRecId] - {} {} {} {} {} {} {}",
snsTopic, PubLogConstants.LOG_EVT_TXN_PUBLISH_STATS, subId, spaceId, publishedRecCnt, lotTimeTaken, pubDTO.getLastTxnId(), pubDTO.getLastTxnRecId());
}

return pubDTO;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import java.util.List;

public interface IPublisher {
PublishEntryDTO publishTransactions(final PubConfig pubCfg, final Subscription sub,
void publishTransactions(final PubConfig pubCfg, final Subscription sub,
final List<PubTransactionData> txnList,
final long lastTxnId, final long lastTxnRecId) throws Exception;
final PublishEntryDTO pubDTO) throws Exception;

}