Skip to content

Commit f351460

Browse files
committed
Add thread-safe synchronization to startUpdatingOwnershipForShard
Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
1 parent f2512e0 commit f351460

1 file changed

Lines changed: 6 additions & 1 deletion

File tree

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,12 @@ public boolean isExportDone(StreamPartition streamPartition) {
320320
}
321321

322322
void startUpdatingOwnershipForShard(final StreamPartition streamPartition) {
323-
checkpoints.computeIfAbsent(streamPartition, segment -> new ConcurrentLinkedQueue<>());
323+
lock.lock();
324+
try {
325+
checkpoints.computeIfAbsent(streamPartition, segment -> new ConcurrentLinkedQueue<>());
326+
} finally {
327+
lock.unlock();
328+
}
324329
}
325330

326331
boolean isStillTrackingShard(final StreamPartition streamPartition) {

0 commit comments

Comments
 (0)