diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java index d0259a8b15..f6633402cd 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java @@ -320,7 +320,12 @@ public boolean isExportDone(StreamPartition streamPartition) { } void startUpdatingOwnershipForShard(final StreamPartition streamPartition) { - checkpoints.computeIfAbsent(streamPartition, segment -> new ConcurrentLinkedQueue<>()); + lock.lock(); + try { + checkpoints.computeIfAbsent(streamPartition, segment -> new ConcurrentLinkedQueue<>()); + } finally { + lock.unlock(); + } } boolean isStillTrackingShard(final StreamPartition streamPartition) {