From 7bf0dc06db90106228984b4fa5ac477b254607f9 Mon Sep 17 00:00:00 2001 From: Jonah Calvo Date: Tue, 5 Aug 2025 15:35:23 -0500 Subject: [PATCH] Performance improvements of acknowledgements manager in DDB source Signed-off-by: Jonah Calvo --- .../stream/ShardAcknowledgementManager.java | 27 ++++++++++++------- .../source/dynamodb/stream/ShardConsumer.java | 9 +++---- .../ShardAcknowledgementManagerTest.java | 3 +-- 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java index f4afc5243e..4ec0e0f712 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java @@ -46,7 +46,7 @@ public class ShardAcknowledgementManager { private static final long WAIT_FOR_ACKNOWLEDGMENTS_TIMEOUT = 10L; - static final Duration CHECKPOINT_INTERVAL = Duration.ofMinutes(2); + static final long CHECKPOINT_INTERVAL = Duration.ofMinutes(2).toMillis(); private final DynamoDBSourceConfig dynamoDBSourceConfig; private final Map> checkpoints = new ConcurrentHashMap<>(); @@ -61,7 +61,7 @@ public class ShardAcknowledgementManager { private final List partitionsToGiveUp; private boolean shutdownTriggered; - private Instant lastCheckpointTime; + private long lastCheckpointTime; public ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager, final EnhancedSourceCoordinator sourceCoordinator, @@ -72,9 +72,9 @@ public ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgeme this.sourceCoordinator = sourceCoordinator; this.dynamoDBSourceConfig = dynamoDBSourceConfig; this.executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("dynamodb-shard-ack-monitor")); - this.partitionsToRemove = Collections.synchronizedList(new ArrayList<>()); + this.partitionsToRemove = new ArrayList<>(); this.partitionsToGiveUp = Collections.synchronizedList(new ArrayList<>()); - this.lastCheckpointTime = Instant.now(); + this.lastCheckpointTime = System.currentTimeMillis(); executorService.submit(() -> monitorAcknowledgments(stopWorkerConsumer)); } @@ -85,6 +85,12 @@ void monitorAcknowledgments(final Consumer stopWorkerConsumer) if (exit) { break; } + try { + // Idle between loops to save on CPU + Thread.sleep(300); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } LOG.info("Exiting acknowledgment manager"); @@ -189,14 +195,14 @@ public AcknowledgementSet createAcknowledgmentSet( } void updateOwnershipForAllShardPartitions() { - if (Duration.between(lastCheckpointTime, Instant.now()).compareTo(CHECKPOINT_INTERVAL) > 0) { + if (System.currentTimeMillis() - lastCheckpointTime > CHECKPOINT_INTERVAL) { for (final StreamPartition streamPartition : checkpoints.keySet()) { if (!partitionsToRemove.contains(streamPartition)) { sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout()); } } - lastCheckpointTime = Instant.now(); + lastCheckpointTime = System.currentTimeMillis(); } } @@ -233,12 +239,13 @@ public void shutdown() { } private void removePartitions() { - partitionsToRemove.forEach(streamPartition -> { + for (final StreamPartition streamPartition : partitionsToRemove) { checkpoints.remove(streamPartition); ackStatuses.remove(streamPartition); - }); - - partitionsToRemove.clear(); + } + if (!partitionsToRemove.isEmpty()){ + partitionsToRemove.clear(); + } } public void giveUpPartition(final StreamPartition streamPartition) { diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java index b632611865..673acc0be8 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java @@ -263,13 +263,12 @@ public void run() { LOG.debug("Reached end of shard"); break; } - - if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) { - LOG.debug("{} records written to buffer for shard {}", recordsWrittenToBuffer, shardId); - if (shardAcknowledgementManager == null) { + if (shardAcknowledgementManager == null) { + if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) { + LOG.debug("{} records written to buffer for shard {}", recordsWrittenToBuffer, shardId); checkpointer.checkpoint(sequenceNumber); + lastCheckpointTime = System.currentTimeMillis(); } - lastCheckpointTime = System.currentTimeMillis(); } GetRecordsResponse response = callGetRecords(shardIterator); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java index 9b13fe807a..9288fcb83a 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java @@ -23,7 +23,6 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; import java.time.Duration; -import java.time.Instant; import java.util.Optional; import java.util.function.Consumer; @@ -107,7 +106,7 @@ void testUpdateOwnershipForAllShardPartitions() throws Exception { // Set lastCheckpointTime to past to trigger checkpoint interval setField(ShardAcknowledgementManager.class, shardAcknowledgementManager, - "lastCheckpointTime", Instant.now().minus(Duration.ofMinutes(5))); + "lastCheckpointTime", System.currentTimeMillis() - Duration.ofMinutes(5).toMillis()); // Call updateOwnershipForAllShardPartitions directly shardAcknowledgementManager.updateOwnershipForAllShardPartitions();