From f35146091b8fe1fd48aa3a2c9a0d72b6aa0ea604 Mon Sep 17 00:00:00 2001 From: Divyansh Bokadia Date: Fri, 23 Jan 2026 12:06:26 -0600 Subject: [PATCH] Add thread-safe synchronization to startUpdatingOwnershipForShard Signed-off-by: Divyansh Bokadia --- .../dynamodb/stream/ShardAcknowledgementManager.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java index d0259a8b15..f6633402cd 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java @@ -320,7 +320,12 @@ public boolean isExportDone(StreamPartition streamPartition) { } void startUpdatingOwnershipForShard(final StreamPartition streamPartition) { - checkpoints.computeIfAbsent(streamPartition, segment -> new ConcurrentLinkedQueue<>()); + lock.lock(); + try { + checkpoints.computeIfAbsent(streamPartition, segment -> new ConcurrentLinkedQueue<>()); + } finally { + lock.unlock(); + } } boolean isStillTrackingShard(final StreamPartition streamPartition) {