From 2fb502faa79760a217acc6c633d9f251a18af005 Mon Sep 17 00:00:00 2001 From: Jonah Calvo Date: Wed, 25 Jun 2025 02:41:06 -0500 Subject: [PATCH 01/14] Implement checkpointing of dynamodb stream source Signed-off-by: Jonah Calvo --- .../dynamodb/model/ShardCheckpointStatus.java | 68 ++++++ .../stream/ShardAcknowledgementManager.java | 231 ++++++++++++++++++ .../source/dynamodb/stream/ShardConsumer.java | 44 ++-- .../dynamodb/stream/ShardConsumerFactory.java | 8 +- .../dynamodb/stream/StreamScheduler.java | 53 ++-- .../ShardAcknowledgementManagerTest.java | 83 +++++++ .../stream/ShardConsumerFactoryTest.java | 12 +- .../dynamodb/stream/ShardConsumerTest.java | 47 +--- .../dynamodb/stream/StreamSchedulerTest.java | 8 +- 9 files changed, 453 insertions(+), 101 deletions(-) create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/ShardCheckpointStatus.java create mode 100644 data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java create mode 100644 data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/ShardCheckpointStatus.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/ShardCheckpointStatus.java new file mode 100644 index 0000000000..d8b0208788 --- /dev/null +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/ShardCheckpointStatus.java @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.dynamodb.model; + +import java.time.Duration; +import java.time.Instant; + +public class ShardCheckpointStatus { + private final String sequenceNumber; + + private final boolean isFinalAcknowledgmentForPartition; + private AcknowledgmentStatus acknowledgeStatus; + private final long createTimestamp; + private Long acknowledgedTimestamp; + + public enum AcknowledgmentStatus { + POSITIVE_ACK, + NEGATIVE_ACK, + NO_ACK + } + + public ShardCheckpointStatus(final String sequenceNumber, final long createTimestamp, final boolean isFinalAcknowledgmentForPartition) { + this.sequenceNumber = sequenceNumber; + this.acknowledgeStatus = AcknowledgmentStatus.NO_ACK; + this.createTimestamp = createTimestamp; + this.isFinalAcknowledgmentForPartition = isFinalAcknowledgmentForPartition; + } + + public void setAcknowledgedTimestamp(final Long acknowledgedTimestamp) { + this.acknowledgedTimestamp = acknowledgedTimestamp; + } + + public void setAcknowledged(final AcknowledgmentStatus acknowledgmentStatus) { + this.acknowledgeStatus = acknowledgmentStatus; + } + + public String getSequenceNumber() { + return sequenceNumber; + } + + public boolean isPositiveAcknowledgement() { + return this.acknowledgeStatus == AcknowledgmentStatus.POSITIVE_ACK; + } + + public boolean isNegativeAcknowledgement() { + return this.acknowledgeStatus == AcknowledgmentStatus.NEGATIVE_ACK; + } + + public long getCreateTimestamp() { + return createTimestamp; + } + + public long getAcknowledgedTimestamp() { + return acknowledgedTimestamp; + } + + public boolean isFinalAcknowledgmentForPartition() { + return isFinalAcknowledgmentForPartition; + } + + public boolean isExpired(final Duration expiredDuration) { + return Duration.between(Instant.ofEpochMilli(createTimestamp), Instant.now()).compareTo(expiredDuration) > 0; + } + +} \ No newline at end of file diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java new file mode 100644 index 0000000000..f08aefeae9 --- /dev/null +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java @@ -0,0 +1,231 @@ +package org.opensearch.dataprepper.plugins.source.dynamodb.stream; + +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; + +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig; +import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; +import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import org.opensearch.dataprepper.plugins.source.dynamodb.model.ShardCheckpointStatus; + +public class ShardAcknowledgementManager { + private static final Logger LOG = LoggerFactory.getLogger(ShardAcknowledgementManager.class); + + private static final String NULL_SEQUENCE_NUMBER = "null"; + + private static final long WAIT_FOR_ACKNOWLEDGMENTS_TIMEOUT = 10L; + + static final Duration CHECKPOINT_INTERVAL = Duration.ofSeconds(10); + + private final DynamoDBSourceConfig dynamoDBSourceConfig; + private final Map> checkpoints = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> ackStatus = new ConcurrentHashMap<>(); + + private final AcknowledgementSetManager acknowledgementSetManager; + + private final EnhancedSourceCoordinator sourceCoordinator; + + private final ExecutorService executorService; + private final List partitionsToRemove; + private boolean shutdownTriggered; + + private Instant lastCheckpointTime; + + public ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager, + final EnhancedSourceCoordinator sourceCoordinator, + final DynamoDBSourceConfig dynamoDBSourceConfig + ) { + this.acknowledgementSetManager = acknowledgementSetManager; + this.sourceCoordinator = sourceCoordinator; + this.dynamoDBSourceConfig = dynamoDBSourceConfig; + this.executorService = Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r, "dynamodb-shard-ack-monitor"); + t.setDaemon(true); + return t; + }); + this.partitionsToRemove = Collections.synchronizedList(new ArrayList<>()); + this.lastCheckpointTime = Instant.now(); + } + + public void init(final Consumer stopWorkerConsumer) { + executorService.submit(() -> monitorAcknowledgments(stopWorkerConsumer)); + } + + void monitorAcknowledgments(final Consumer stopWorkerConsumer) { + while (!Thread.currentThread().isInterrupted()) { + boolean exit = runMonitorAcknowledgmentLoop(stopWorkerConsumer); + if (exit) { + break; + } + } + + LOG.info("Exiting acknowledgment manager"); + } + + boolean runMonitorAcknowledgmentLoop(final Consumer stopWorkerConsumer) { + removePartitions(); + if (shutdownTriggered && checkpoints.isEmpty()) { + LOG.info("Shutdown was triggered and not waiting on any acknowledgments, exiting cleanly"); + return true; + } + + for (final StreamPartition streamPartition : checkpoints.keySet()) { + try { + final StreamProgressState streamProgressState = streamPartition.getProgressState().orElseThrow(); + final ConcurrentLinkedQueue checkpointStatuses = checkpoints.get(streamPartition); + ShardCheckpointStatus latestCheckpointForShard = null; + boolean gaveUpPartition = false; + while (!checkpointStatuses.isEmpty()) { + updateOwnershipForAllSegmentPartitions(); + + if (checkpointStatuses.peek().isPositiveAcknowledgement()) { + latestCheckpointForShard = checkpointStatuses.poll(); + } else if (checkpointStatuses.peek().isNegativeAcknowledgement() + || checkpointStatuses.peek().isExpired(dynamoDBSourceConfig.getShardAcknowledgmentTimeout())) { + handleFailure(streamPartition, streamProgressState, latestCheckpointForShard); + gaveUpPartition = true; + + if (checkpointStatuses.peek().isNegativeAcknowledgement()) { + LOG.warn("Received negative acknowledgment for partition {} with sequence number {}, giving up partition", + streamPartition.getPartitionKey(), checkpointStatuses.peek().getSequenceNumber()); + } else { + LOG.warn("Acknowledgment timed out for partition {} with sequence number {}, giving up partition", + streamPartition.getPartitionKey(), checkpointStatuses.peek().getSequenceNumber()); + } + + stopWorkerConsumer.accept(streamPartition); + break; + } else { + break; + } + } + + if (!gaveUpPartition) { + updateOwnershipForAllSegmentPartitions(); + } + + if (gaveUpPartition || latestCheckpointForShard == null) { + continue; + } + + if (latestCheckpointForShard.isFinalAcknowledgmentForPartition()) { + handleCompletedSegment(streamPartition); + } else { + streamProgressState.setSequenceNumber(Objects.equals(latestCheckpointForShard.getSequenceNumber(), NULL_SEQUENCE_NUMBER) ? null : latestCheckpointForShard.getSequenceNumber()); + sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout()); + LOG.debug("Checkpointed shard {} with latest sequence number acknowledged {}", streamPartition.getShardId(), latestCheckpointForShard.getSequenceNumber()); + } + } catch (final Exception e) { + LOG.error("Received exception while monitoring acknowledgments for stream partition {}", streamPartition.getPartitionKey(), e); + } + } + + return false; + } + + public AcknowledgementSet createAcknowledgmentSet( + final StreamPartition streamPartition, + final String sequenceNumber, + final boolean isFinalSetForPartition) { + final String sequenceNumberNoNull = sequenceNumber == null ? NULL_SEQUENCE_NUMBER : sequenceNumber; + final ShardCheckpointStatus shardCheckpointStatus = new ShardCheckpointStatus(sequenceNumber, Instant.now().toEpochMilli(), isFinalSetForPartition); + checkpoints.computeIfAbsent(streamPartition, segment -> new ConcurrentLinkedQueue<>()).add(shardCheckpointStatus); + ackStatus.computeIfAbsent(streamPartition, segment -> new ConcurrentHashMap<>()); + ackStatus.get(streamPartition).put(sequenceNumberNoNull, shardCheckpointStatus); + + return acknowledgementSetManager.create((result) -> { + if (ackStatus.containsKey(streamPartition) && ackStatus.get(streamPartition).containsKey(sequenceNumberNoNull)) { + final ShardCheckpointStatus ackCheckpointStatus = ackStatus.get(streamPartition).get(sequenceNumberNoNull); + + ackCheckpointStatus.setAcknowledgedTimestamp(Instant.now().toEpochMilli()); + + if (result) { + LOG.debug("Received acknowledgment of completion from sink for partition {} with sequence number {}", + streamPartition.getPartitionKey(), sequenceNumberNoNull); + ackCheckpointStatus.setAcknowledged(ShardCheckpointStatus.AcknowledgmentStatus.POSITIVE_ACK); + } else { + LOG.warn("Negative acknowledgment received for partition {} with sequence number {}", + streamPartition.getPartitionKey(), sequenceNumberNoNull); + ackCheckpointStatus.setAcknowledged(ShardCheckpointStatus.AcknowledgmentStatus.NEGATIVE_ACK); + } + } + }, dynamoDBSourceConfig.getShardAcknowledgmentTimeout()); + } + + void updateOwnershipForAllSegmentPartitions() { + if (Duration.between(lastCheckpointTime, Instant.now()).compareTo(CHECKPOINT_INTERVAL) > 0) { + for (final StreamPartition streamPartition : checkpoints.keySet()) { + if (!partitionsToRemove.contains(streamPartition)) { + sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout()); + } + } + + lastCheckpointTime = Instant.now(); + } + } + + private void handleFailure(final StreamPartition streamPartition, + final StreamProgressState streamProgressState, + final ShardCheckpointStatus latestCheckpointForShard) { + if (latestCheckpointForShard != null) { + streamProgressState.setSequenceNumber(latestCheckpointForShard.getSequenceNumber()); + sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout()); + } + partitionsToRemove.add(streamPartition); + sourceCoordinator.giveUpPartition(streamPartition); + } + + private void handleCompletedSegment(final StreamPartition streamPartition) { + sourceCoordinator.completePartition(streamPartition); + partitionsToRemove.add(streamPartition); + LOG.info("Received all acknowledgments for partition {}, marking partition as completed", streamPartition.getPartitionKey()); + } + + public void shutdown() { + shutdownTriggered = true; + executorService.shutdown(); + try { + if (!executorService.awaitTermination(WAIT_FOR_ACKNOWLEDGMENTS_TIMEOUT, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + private void removePartitions() { + partitionsToRemove.forEach(streamPartition -> { + checkpoints.remove(streamPartition); + ackStatus.remove(streamPartition); + }); + + partitionsToRemove.clear(); + } + + public boolean isExportDone(StreamPartition streamPartition) { + Optional globalPartition = sourceCoordinator.getPartition(streamPartition.getStreamArn()); + return globalPartition.isPresent(); + } +} diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java index f3297671d8..067b19694c 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java @@ -13,6 +13,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig; +import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics; @@ -94,7 +95,11 @@ public class ShardConsumer implements Runnable { private final StreamRecordConverter recordConverter; - private final StreamCheckpointer checkpointer; +// private final StreamCheckpointer checkpointer; + + private final ShardAcknowledgementManager shardAcknowledgementManager; + + private final StreamPartition streamPartition; private String shardIterator; @@ -119,7 +124,9 @@ public class ShardConsumer implements Runnable { private ShardConsumer(Builder builder) { this.shardProgress = builder.pluginMetrics.counter(SHARD_PROGRESS); this.dynamoDbStreamsClient = builder.dynamoDbStreamsClient; - this.checkpointer = builder.checkpointer; +// this.checkpointer = builder.checkpointer; + this.shardAcknowledgementManager = builder.shardAcknowledgementManager; + this.streamPartition = builder.streamPartition; this.shardIterator = builder.shardIterator; this.lastShardIterator = builder.lastShardIterator; // Introduce an overlap @@ -155,7 +162,11 @@ static class Builder { private TableInfo tableInfo; - private StreamCheckpointer checkpointer; +// private StreamCheckpointer checkpointer; + + private ShardAcknowledgementManager shardAcknowledgementManager; + + private StreamPartition streamPartition; private String shardIterator; @@ -194,8 +205,18 @@ public Builder shardId(final String shardId) { return this; } - public Builder checkpointer(StreamCheckpointer checkpointer) { - this.checkpointer = checkpointer; +// public Builder checkpointer(StreamCheckpointer checkpointer) { +// this.checkpointer = checkpointer; +// return this; +// } + + public Builder shardAcknowledgementManager(ShardAcknowledgementManager shardAcknowledgementManager) { + this.shardAcknowledgementManager = shardAcknowledgementManager; + return this; + } + + public Builder streamPartition(StreamPartition streamPartition) { + this.streamPartition = streamPartition; return this; } @@ -243,7 +264,6 @@ public void run() { if (shouldSkip()) { shardProgress.increment(); if (acknowledgementSet != null) { - checkpointer.updateShardForAcknowledgmentWait(shardAcknowledgmentTimeout); acknowledgementSet.complete(); } return; @@ -263,17 +283,11 @@ public void run() { if (shardIterator == null) { // End of Shard LOG.debug("Reached end of shard"); - checkpointer.checkpoint(sequenceNumber); break; } if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) { LOG.debug("{} records written to buffer for shard {}", recordsWrittenToBuffer, shardId); - if (acknowledgementSet != null) { - checkpointer.updateShardForAcknowledgmentWait(shardAcknowledgmentTimeout); - } else { - checkpointer.checkpoint(sequenceNumber); - } lastCheckpointTime = System.currentTimeMillis(); } @@ -289,7 +303,6 @@ public void run() { continue; } if (waitForExport) { - checkpointer.checkpoint(sequenceNumber); waitForExport(); waitForExport = false; } @@ -319,12 +332,10 @@ public void run() { if (shouldStop) { // Do last checkpoint and then quit LOG.warn("Processing for shard {} was interrupted by a shutdown signal, giving up shard", shardId); - checkpointer.checkpoint(sequenceNumber); throw new RuntimeException("Consuming shard was interrupted from shutdown"); } if (acknowledgementSet != null) { - checkpointer.updateShardForAcknowledgmentWait(shardAcknowledgmentTimeout); acknowledgementSet.complete(); } @@ -366,7 +377,7 @@ private GetRecordsResponse callGetRecords(String shardIterator) { private void waitForExport() { LOG.debug("Start waiting for export to be done and loaded"); int numberOfWaits = 0; - while (!checkpointer.isExportDone()) { + while (!shardAcknowledgementManager.isExportDone(streamPartition)) { LOG.debug("Export is in progress, wait..."); try { shardProgress.increment(); @@ -377,7 +388,6 @@ private void waitForExport() { numberOfWaits++; if (numberOfWaits % DEFAULT_WAIT_COUNT_TO_CHECKPOINT == 0) { // To extend the timeout of lease - checkpointer.checkpoint(null); } } catch (InterruptedException e) { LOG.error("Wait for export is interrupted ({})", e.getMessage()); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java index 0a72133032..2a2df78141 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java @@ -65,7 +65,8 @@ public ShardConsumerFactory(final EnhancedSourceCoordinator enhancedSourceCoordi public Runnable createConsumer(final StreamPartition streamPartition, final AcknowledgementSet acknowledgementSet, - final Duration shardAcknowledgmentTimeout) { + final Duration shardAcknowledgmentTimeout, + final ShardAcknowledgementManager shardAcknowledgementManager) { LOG.info("Starting to consume shard " + streamPartition.getShardId()); @@ -95,7 +96,7 @@ public Runnable createConsumer(final StreamPartition streamPartition, return null; } - StreamCheckpointer checkpointer = new StreamCheckpointer(enhancedSourceCoordinator, streamPartition); + String tableArn = TableUtil.getTableArnFromStreamArn(streamPartition.getStreamArn()); TableInfo tableInfo = getTableInfo(tableArn); @@ -103,7 +104,8 @@ public Runnable createConsumer(final StreamPartition streamPartition, LOG.debug("Create shard consumer for {} with lastShardIter {}", streamPartition.getShardId(), lastShardIterator); ShardConsumer shardConsumer = ShardConsumer.builder(streamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig) .tableInfo(tableInfo) - .checkpointer(checkpointer) + .shardAcknowledgementManager(shardAcknowledgementManager) + .streamPartition(streamPartition) .shardIterator(shardIterator) .shardId(streamPartition.getShardId()) .lastShardIterator(lastShardIterator) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java index bd58e9e0d5..8a26f429e2 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; +import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; import org.opensearch.dataprepper.plugins.source.dynamodb.utils.BackoffCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,8 +25,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; -import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; - /** * A scheduler to manage all the stream related work in one place */ @@ -58,6 +57,8 @@ public class StreamScheduler implements Runnable { private final BackoffCalculator backoffCalculator; private int noAvailableShardsCount = 0; + private final ShardAcknowledgementManager shardAcknowledgementManager; + public StreamScheduler(final EnhancedSourceCoordinator coordinator, final ShardConsumerFactory consumerFactory, @@ -71,6 +72,8 @@ public StreamScheduler(final EnhancedSourceCoordinator coordinator, this.acknowledgementSetManager = acknowledgementSetManager; this.dynamoDBSourceConfig = dynamoDBSourceConfig; this.backoffCalculator = backoffCalculator; + this.shardAcknowledgementManager = new ShardAcknowledgementManager(acknowledgementSetManager, coordinator, dynamoDBSourceConfig); + this.shardAcknowledgementManager.init(coordinator::giveUpPartition); executor = Executors.newFixedThreadPool(MAX_JOB_COUNT); activeChangeEventConsumers = pluginMetrics.gauge(ACTIVE_CHANGE_EVENT_CONSUMERS, new AtomicLong()); @@ -80,40 +83,17 @@ public StreamScheduler(final EnhancedSourceCoordinator coordinator, private void processStreamPartition(StreamPartition streamPartition) { final boolean acknowledgmentsEnabled = dynamoDBSourceConfig.isAcknowledgmentsEnabled(); AcknowledgementSet acknowledgementSet = null; - if (acknowledgmentsEnabled) { - acknowledgementSet = acknowledgementSetManager.create((result) -> { - if (result) { - LOG.info("Received acknowledgment of completion from sink for shard {}", streamPartition.getShardId()); - completeConsumer(streamPartition).accept(null, null); - } else { - LOG.warn("Negative acknowledgment received for shard {}, it will be retried", streamPartition.getShardId()); - coordinator.giveUpPartition(streamPartition); - } - }, dynamoDBSourceConfig.getShardAcknowledgmentTimeout()); + final StreamProgressState lastProgressState = streamPartition.getProgressState().orElseThrow(); + final String endingSequenceNumber = lastProgressState.getEndingSequenceNumber(); + acknowledgementSet = shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, lastProgressState.getSequenceNumber(), endingSequenceNumber == null || endingSequenceNumber.isEmpty()); } - Runnable shardConsumer = consumerFactory.createConsumer(streamPartition, acknowledgementSet, dynamoDBSourceConfig.getShardAcknowledgmentTimeout()); + Runnable shardConsumer = consumerFactory.createConsumer(streamPartition, acknowledgementSet, dynamoDBSourceConfig.getShardAcknowledgmentTimeout(), shardAcknowledgementManager); if (shardConsumer != null) { CompletableFuture runConsumer = CompletableFuture.runAsync(shardConsumer, executor); - - if (acknowledgmentsEnabled) { - runConsumer.whenComplete((v, ex) -> { - numOfWorkers.decrementAndGet(); - if (ex != null) { - LOG.error(NOISY, "Received exception while processing shard {}, giving up this shard for reprocessing: {}", - streamPartition.getShardId(), ex); - coordinator.giveUpPartition(streamPartition); - } - if (numOfWorkers.get() == 0) { - activeChangeEventConsumers.decrementAndGet(); - } - shardsInProcessing.decrementAndGet(); - }); - } else { - runConsumer.whenComplete(completeConsumer(streamPartition)); - } + runConsumer.whenComplete(completeConsumer(streamPartition)); numOfWorkers.incrementAndGet(); if (numOfWorkers.get() % 10 == 0) { SHARD_COUNT_LOGGER.info("Actively processing {} shards", numOfWorkers.get()); @@ -172,23 +152,18 @@ public void run() { private BiConsumer completeConsumer(StreamPartition streamPartition) { return (v, ex) -> { - if (!dynamoDBSourceConfig.isAcknowledgmentsEnabled()) { - numOfWorkers.decrementAndGet(); - if (numOfWorkers.get() == 0) { - activeChangeEventConsumers.decrementAndGet(); - } - shardsInProcessing.decrementAndGet(); + numOfWorkers.decrementAndGet(); + if (numOfWorkers.get() == 0) { + activeChangeEventConsumers.decrementAndGet(); } + shardsInProcessing.decrementAndGet(); if (ex == null) { LOG.info("Shard consumer for {} is completed", streamPartition.getShardId()); coordinator.completePartition(streamPartition); } else { - // Do nothing - // The consumer must have already done one last checkpointing. LOG.error("Received an exception while processing shard {}, giving up shard: {}", streamPartition.getShardId(), ex); coordinator.giveUpPartition(streamPartition); } }; } - } \ No newline at end of file diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java new file mode 100644 index 0000000000..ed8edf2c0c --- /dev/null +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java @@ -0,0 +1,83 @@ +package org.opensearch.dataprepper.plugins.source.dynamodb.stream; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig; +import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; +import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; + +import java.time.Duration; +import java.util.Optional; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +class ShardAcknowledgementManagerTest { + + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + + @Mock + private EnhancedSourceCoordinator sourceCoordinator; + + @Mock + private DynamoDBSourceConfig dynamoDBSourceConfig; + + @Mock + private StreamPartition streamPartition; + + @Mock + private StreamProgressState streamProgressState; + + @Mock + private AcknowledgementSet acknowledgementSet; + + @Mock + private Consumer stopWorkerConsumer; + + private ShardAcknowledgementManager shardAcknowledgementManager; + + @BeforeEach + void setUp() { + when(dynamoDBSourceConfig.getShardAcknowledgmentTimeout()).thenReturn(Duration.ofMinutes(15)); + shardAcknowledgementManager = new ShardAcknowledgementManager( + acknowledgementSetManager, sourceCoordinator, dynamoDBSourceConfig); + } + + @Test + void testCreateAcknowledgmentSet() { + when(acknowledgementSetManager.create(any(Consumer.class), any(Duration.class))) + .thenReturn(acknowledgementSet); + + AcknowledgementSet result = shardAcknowledgementManager.createAcknowledgmentSet( + streamPartition, "seq123", false); + + assertNotNull(result); + verify(acknowledgementSetManager).create(any(Consumer.class), eq(Duration.ofMinutes(15))); + } + + @Test + void testIsExportDone() { + when(streamPartition.getStreamArn()).thenReturn("stream-arn"); + when(sourceCoordinator.getPartition("stream-arn")).thenReturn(Optional.of(streamPartition)); + + boolean result = shardAcknowledgementManager.isExportDone(streamPartition); + + assertTrue(result); + } + + @Test + void testShutdown() { + assertDoesNotThrow(() -> shardAcknowledgementManager.shutdown()); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java index 6d503f7a19..8aff55a48a 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java @@ -116,7 +116,8 @@ public void test_create_shardConsumer_correctly() { streamPartition = new StreamPartition(streamArn, shardId, Optional.of(state)); ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig); - Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null); + ShardAcknowledgementManager shardAcknowledgementManager = mock(ShardAcknowledgementManager.class); + Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null, shardAcknowledgementManager); assertThat(consumer, notNullValue()); verify(dynamoDbStreamsClient).getShardIterator(any(GetShardIteratorRequest.class)); @@ -133,7 +134,8 @@ public void test_create_shardConsumer_for_closedShards() { streamPartition = new StreamPartition(streamArn, shardId, Optional.of(state)); ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig); - Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null); + ShardAcknowledgementManager shardAcknowledgementManager = mock(ShardAcknowledgementManager.class); + Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null, shardAcknowledgementManager); assertThat(consumer, notNullValue()); // Should get iterators twice verify(dynamoDbStreamsClient, times(2)).getShardIterator(any(GetShardIteratorRequest.class)); @@ -154,7 +156,8 @@ void stream5xxErrors_is_incremented_when_get_shard_iterator_throws_internal_exce when(dynamoDBSourceAggregateMetrics.getStream5xxErrors()).thenReturn(stream5xxErrors); ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig); - Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null); + ShardAcknowledgementManager shardAcknowledgementManager = mock(ShardAcknowledgementManager.class); + Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null, shardAcknowledgementManager); assertThat(consumer, nullValue()); verify(stream5xxErrors).increment(); verify(streamApiInvocations).increment(); @@ -172,7 +175,8 @@ void stream4xxErrors_is_incremented_when_get_shard_iterator_throws_dynamodb_exce when(dynamoDBSourceAggregateMetrics.getStream4xxErrors()).thenReturn(stream4xxErrors); ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig); - Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null); + ShardAcknowledgementManager shardAcknowledgementManager = mock(ShardAcknowledgementManager.class); + Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null, shardAcknowledgementManager); assertThat(consumer, nullValue()); verify(stream4xxErrors).increment(); verify(streamApiInvocations).increment(); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java index 8d7fd97e0d..8b8dd1c5db 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java @@ -63,7 +63,6 @@ import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumer.BUFFER_TIMEOUT; import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumer.DEFAULT_BUFFER_BATCH_SIZE; import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumer.SHARD_PROGRESS; -import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamCheckpointer.CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE; @ExtendWith(MockitoExtension.class) class ShardConsumerTest { @@ -110,14 +109,13 @@ class ShardConsumerTest { @Mock private StreamConfig streamConfig; - - private StreamCheckpointer checkpointer; + @Mock + private ShardAcknowledgementManager shardAcknowledgementManager; private StreamPartition streamPartition; private TableInfo tableInfo; - private final String tableName = UUID.randomUUID().toString(); private final String tableArn = "arn:aws:dynamodb:us-west-2:123456789012:table/" + tableName; @@ -134,7 +132,6 @@ class ShardConsumerTest { private final int total = random.nextInt(10) + 1; - @BeforeEach void setup() throws Exception { @@ -143,7 +140,6 @@ void setup() throws Exception { state.setStartTime(Instant.now().toEpochMilli()); streamPartition = new StreamPartition(streamArn, shardId, Optional.of(state)); - // Mock Global Table Info lenient().when(coordinator.getPartition(tableArn)).thenReturn(Optional.of(tableInfoGlobalState)); TableMetadata metadata = TableMetadata.builder() @@ -157,14 +153,12 @@ void setup() throws Exception { lenient().when(coordinator.createPartition(any(EnhancedSourcePartition.class))).thenReturn(true); lenient().doNothing().when(coordinator).completePartition(any(EnhancedSourcePartition.class)); - lenient().doNothing().when(coordinator).saveProgressStateForPartition(any(EnhancedSourcePartition.class), eq(null)); + lenient().doNothing().when(coordinator).saveProgressStateForPartition(any(EnhancedSourcePartition.class), any(Duration.class)); lenient().doNothing().when(coordinator).giveUpPartition(any(EnhancedSourcePartition.class)); lenient().doNothing().when(bufferAccumulator).add(any(org.opensearch.dataprepper.model.record.Record.class)); lenient().doNothing().when(bufferAccumulator).flush(); - checkpointer = new StreamCheckpointer(coordinator, streamPartition); - List records = buildRecords(total); GetRecordsResponse response = GetRecordsResponse.builder() .records(records) @@ -180,7 +174,6 @@ void setup() throws Exception { when(aggregateMetrics.getStreamApiInvocations()).thenReturn(streamApiInvocations); } - @Test void test_run_shardConsumer_correctly() throws Exception { ShardConsumer shardConsumer; @@ -190,7 +183,8 @@ void test_run_shardConsumer_correctly() throws Exception { bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig) .shardIterator(shardIterator) - .checkpointer(checkpointer) + .shardAcknowledgementManager(shardAcknowledgementManager) + .streamPartition(streamPartition) .tableInfo(tableInfo) .startTime(null) .waitForExport(false) @@ -199,15 +193,9 @@ void test_run_shardConsumer_correctly() throws Exception { shardConsumer.run(); - // Should call GetRecords verify(dynamoDbStreamsClient).getRecords(any(GetRecordsRequest.class)); - - // Should write to buffer verify(bufferAccumulator, times(total)).add(any(org.opensearch.dataprepper.model.record.Record.class)); verify(bufferAccumulator).flush(); - // Should complete the consumer as reach to end of shard - verify(coordinator).saveProgressStateForPartition(any(StreamPartition.class), eq(CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE)); - verify(streamApiInvocations).increment(); verify(shardProgress).increment(); } @@ -224,7 +212,8 @@ void test_run_shardConsumer_with_acknowledgments_correctly() throws Exception { bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig) .shardIterator(shardIterator) - .checkpointer(checkpointer) + .shardAcknowledgementManager(shardAcknowledgementManager) + .streamPartition(streamPartition) .tableInfo(tableInfo) .startTime(null) .acknowledgmentSetTimeout(acknowledgmentTimeout) @@ -242,19 +231,10 @@ void test_run_shardConsumer_with_acknowledgments_correctly() throws Exception { progressCheckConsumer.accept(mock(ProgressCheck.class)); verify(acknowledgementSet).increaseExpiry(any(Duration.class)); - - // Should call GetRecords verify(dynamoDbStreamsClient).getRecords(any(GetRecordsRequest.class)); - - // Should write to buffer verify(bufferAccumulator, times(total)).add(any(org.opensearch.dataprepper.model.record.Record.class)); verify(bufferAccumulator).flush(); - - // Should complete the consumer as reach to end of shard - verify(coordinator).saveProgressStateForPartition(any(StreamPartition.class), eq(CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE)); - verify(acknowledgementSet).complete(); - verify(streamApiInvocations).increment(); verify(shardProgress).increment(); } @@ -274,7 +254,8 @@ void test_run_shardConsumer_with_acknowledgments_and_error_cancels_acknowledgmen bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig) .shardIterator(shardIterator) - .checkpointer(checkpointer) + .shardAcknowledgementManager(shardAcknowledgementManager) + .streamPartition(streamPartition) .tableInfo(tableInfo) .startTime(null) .acknowledgmentSetTimeout(acknowledgmentTimeout) @@ -292,7 +273,6 @@ void test_run_shardConsumer_with_acknowledgments_and_error_cancels_acknowledgmen progressCheckConsumer.accept(mock(ProgressCheck.class)); verify(acknowledgementSet).increaseExpiry(any(Duration.class)); - verify(acknowledgementSet).cancel(); } @@ -305,7 +285,8 @@ void test_run_shardConsumer_catches_5xx_exception_and_increments_metric() { bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig) .shardIterator(shardIterator) - .checkpointer(checkpointer) + .shardAcknowledgementManager(shardAcknowledgementManager) + .streamPartition(streamPartition) .tableInfo(tableInfo) .startTime(null) .waitForExport(false) @@ -329,7 +310,8 @@ void test_run_shardConsumer_catches_4xx_exception_and_increments_metric() { bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig) .shardIterator(shardIterator) - .checkpointer(checkpointer) + .shardAcknowledgementManager(shardAcknowledgementManager) + .streamPartition(streamPartition) .tableInfo(tableInfo) .startTime(null) .waitForExport(false) @@ -344,9 +326,6 @@ void test_run_shardConsumer_catches_4xx_exception_and_increments_metric() { verify(streamApiInvocations).increment(); } - /** - * Helper function to generate some data. - */ private List buildRecords(int count) { List records = new ArrayList<>(); for (int i = 0; i < count; i++) { diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java index 5928442dbb..8e85da2dcd 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java @@ -126,7 +126,7 @@ public void test_normal_run() throws InterruptedException { when(backoffCalculator.calculateBackoffToAcquireNextShard(eq(1), any(AtomicInteger.class))) .thenReturn(10000L); - when(consumerFactory.createConsumer(any(StreamPartition.class), eq(null), any(Duration.class))).thenReturn(() -> LOG.info("Hello")); + when(consumerFactory.createConsumer(any(StreamPartition.class), eq(null), any(Duration.class), any(ShardAcknowledgementManager.class))).thenReturn(() -> LOG.info("Hello")); when(coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).thenReturn(Optional.of(streamPartition)).thenReturn(Optional.empty()); scheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, backoffCalculator); @@ -141,7 +141,7 @@ public void test_normal_run() throws InterruptedException { // Should acquire the stream partition verify(coordinator).acquireAvailablePartition(StreamPartition.PARTITION_TYPE); // Should start a new consumer - verify(consumerFactory).createConsumer(any(StreamPartition.class), eq(null), any(Duration.class)); + verify(consumerFactory).createConsumer(any(StreamPartition.class), eq(null), any(Duration.class), any(ShardAcknowledgementManager.class)); // Should mask the stream partition as completed. verify(coordinator).completePartition(any(StreamPartition.class)); @@ -173,7 +173,7 @@ public void test_normal_run_with_acknowledgments() throws InterruptedException { return acknowledgementSet; }).when(acknowledgementSetManager).create(any(Consumer.class), eq(shardAcknowledgmentTimeout)); - when(consumerFactory.createConsumer(any(StreamPartition.class), eq(acknowledgementSet), eq(shardAcknowledgmentTimeout))).thenReturn(() -> LOG.info("Hello")); + when(consumerFactory.createConsumer(any(StreamPartition.class), eq(acknowledgementSet), eq(shardAcknowledgmentTimeout), any(ShardAcknowledgementManager.class))).thenReturn(() -> LOG.info("Hello")); scheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, backoffCalculator); @@ -188,7 +188,7 @@ public void test_normal_run_with_acknowledgments() throws InterruptedException { // Should acquire the stream partition verify(coordinator).acquireAvailablePartition(StreamPartition.PARTITION_TYPE); // Should start a new consumer - verify(consumerFactory).createConsumer(any(StreamPartition.class), any(AcknowledgementSet.class), any(Duration.class)); + verify(consumerFactory).createConsumer(any(StreamPartition.class), any(AcknowledgementSet.class), any(Duration.class), any(ShardAcknowledgementManager.class)); // Should mask the stream partition as completed. verify(coordinator).completePartition(any(StreamPartition.class)); From f73ecbcd7b79709c972c1c53bfb2694367c37ba3 Mon Sep 17 00:00:00 2001 From: Jonah Calvo Date: Wed, 25 Jun 2025 04:36:20 -0500 Subject: [PATCH 02/14] Fix unit tests Signed-off-by: Jonah Calvo --- .../source/dynamodb/stream/StreamScheduler.java | 4 +++- .../stream/ShardAcknowledgementManagerTest.java | 9 ++++++--- .../source/dynamodb/stream/ShardConsumerTest.java | 11 ++++++----- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java index 8a26f429e2..550166590a 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java @@ -159,7 +159,9 @@ private BiConsumer completeConsumer(StreamPartition streamPartition) { shardsInProcessing.decrementAndGet(); if (ex == null) { LOG.info("Shard consumer for {} is completed", streamPartition.getShardId()); - coordinator.completePartition(streamPartition); + if (!dynamoDBSourceConfig.isAcknowledgmentsEnabled()) { + coordinator.completePartition(streamPartition); + } } else { LOG.error("Received an exception while processing shard {}, giving up shard: {}", streamPartition.getShardId(), ex); coordinator.giveUpPartition(streamPartition); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java index ed8edf2c0c..de2f7d316e 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java @@ -16,10 +16,13 @@ import java.util.Optional; import java.util.function.Consumer; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; @ExtendWith(MockitoExtension.class) class ShardAcknowledgementManagerTest { @@ -49,13 +52,13 @@ class ShardAcknowledgementManagerTest { @BeforeEach void setUp() { - when(dynamoDBSourceConfig.getShardAcknowledgmentTimeout()).thenReturn(Duration.ofMinutes(15)); shardAcknowledgementManager = new ShardAcknowledgementManager( acknowledgementSetManager, sourceCoordinator, dynamoDBSourceConfig); } @Test void testCreateAcknowledgmentSet() { + when(dynamoDBSourceConfig.getShardAcknowledgmentTimeout()).thenReturn(Duration.ofMinutes(15)); when(acknowledgementSetManager.create(any(Consumer.class), any(Duration.class))) .thenReturn(acknowledgementSet); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java index 8b8dd1c5db..e5af69c08d 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java @@ -52,7 +52,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; @@ -171,7 +170,10 @@ void setup() throws Exception { given(pluginMetrics.counter("changeEventsProcessingErrors")).willReturn(testCounter); given(pluginMetrics.summary(anyString())).willReturn(testSummary); - when(aggregateMetrics.getStreamApiInvocations()).thenReturn(streamApiInvocations); + lenient().when(aggregateMetrics.getStreamApiInvocations()).thenReturn(streamApiInvocations); + + + lenient().when(shardAcknowledgementManager.isExportDone(any(StreamPartition.class))).thenReturn(true); } @Test @@ -244,7 +246,6 @@ void test_run_shardConsumer_with_acknowledgments_and_error_cancels_acknowledgmen final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); final Duration acknowledgmentTimeout = Duration.ofSeconds(30); - when(aggregateMetrics.getStream5xxErrors()).thenReturn(stream5xxErrors); when(dynamoDbStreamsClient.getRecords(any(GetRecordsRequest.class))).thenThrow(InternalServerErrorException.class); ShardConsumer shardConsumer; @@ -279,7 +280,6 @@ void test_run_shardConsumer_with_acknowledgments_and_error_cancels_acknowledgmen @Test void test_run_shardConsumer_catches_5xx_exception_and_increments_metric() { ShardConsumer shardConsumer; - when(aggregateMetrics.getStream5xxErrors()).thenReturn(stream5xxErrors); try ( final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); @@ -294,6 +294,7 @@ void test_run_shardConsumer_catches_5xx_exception_and_increments_metric() { } when(dynamoDbStreamsClient.getRecords(any(GetRecordsRequest.class))).thenThrow(InternalServerErrorException.class); + when(aggregateMetrics.getStream5xxErrors()).thenReturn(stream5xxErrors); assertThrows(RuntimeException.class, shardConsumer::run); @@ -304,7 +305,6 @@ void test_run_shardConsumer_catches_5xx_exception_and_increments_metric() { @Test void test_run_shardConsumer_catches_4xx_exception_and_increments_metric() { ShardConsumer shardConsumer; - when(aggregateMetrics.getStream4xxErrors()).thenReturn(stream4xxErrors); try ( final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); @@ -319,6 +319,7 @@ void test_run_shardConsumer_catches_4xx_exception_and_increments_metric() { } when(dynamoDbStreamsClient.getRecords(any(GetRecordsRequest.class))).thenThrow(DynamoDbException.class); + when(aggregateMetrics.getStream4xxErrors()).thenReturn(stream4xxErrors); assertThrows(RuntimeException.class, shardConsumer::run); From 28b31acf2e61f56b86a4b1af1a1f3a0dc944e71f Mon Sep 17 00:00:00 2001 From: Jonah Calvo Date: Thu, 17 Jul 2025 17:08:32 -0500 Subject: [PATCH 03/14] Do acknowledgements in batches per shard Signed-off-by: Jonah Calvo --- .../dynamodb-source/build.gradle | 1 + .../stream/ShardAcknowledgementManager.java | 34 +-- .../source/dynamodb/stream/ShardConsumer.java | 175 +++++------- .../dynamodb/stream/ShardConsumerFactory.java | 10 +- .../dynamodb/stream/StreamScheduler.java | 21 +- .../stream/ShardConsumerFactoryTest.java | 9 +- .../dynamodb/stream/ShardConsumerTest.java | 266 ++++++++++-------- .../dynamodb/stream/StreamSchedulerTest.java | 19 +- 8 files changed, 256 insertions(+), 279 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/build.gradle b/data-prepper-plugins/dynamodb-source/build.gradle index 3b3046434a..056691d7b2 100644 --- a/data-prepper-plugins/dynamodb-source/build.gradle +++ b/data-prepper-plugins/dynamodb-source/build.gradle @@ -23,6 +23,7 @@ dependencies { implementation project(path: ':data-prepper-plugins:aws-plugin-api') implementation project(path: ':data-prepper-plugins:buffer-common') + implementation project(path: ':data-prepper-plugins:common') testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java index f08aefeae9..c86f9333e4 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java @@ -3,7 +3,7 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; - +import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; @@ -36,11 +36,11 @@ public class ShardAcknowledgementManager { private static final long WAIT_FOR_ACKNOWLEDGMENTS_TIMEOUT = 10L; - static final Duration CHECKPOINT_INTERVAL = Duration.ofSeconds(10); + static final Duration CHECKPOINT_INTERVAL = Duration.ofMinutes(2); private final DynamoDBSourceConfig dynamoDBSourceConfig; private final Map> checkpoints = new ConcurrentHashMap<>(); - private final ConcurrentHashMap> ackStatus = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> ackStatuses = new ConcurrentHashMap<>(); private final AcknowledgementSetManager acknowledgementSetManager; @@ -59,11 +59,7 @@ public ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgeme this.acknowledgementSetManager = acknowledgementSetManager; this.sourceCoordinator = sourceCoordinator; this.dynamoDBSourceConfig = dynamoDBSourceConfig; - this.executorService = Executors.newSingleThreadExecutor(r -> { - Thread t = new Thread(r, "dynamodb-shard-ack-monitor"); - t.setDaemon(true); - return t; - }); + this.executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("dynamodb-shard-ack-monitor")); this.partitionsToRemove = Collections.synchronizedList(new ArrayList<>()); this.lastCheckpointTime = Instant.now(); } @@ -97,7 +93,7 @@ boolean runMonitorAcknowledgmentLoop(final Consumer stopWorkerC ShardCheckpointStatus latestCheckpointForShard = null; boolean gaveUpPartition = false; while (!checkpointStatuses.isEmpty()) { - updateOwnershipForAllSegmentPartitions(); + updateOwnershipForAllShardPartitions(); if (checkpointStatuses.peek().isPositiveAcknowledgement()) { latestCheckpointForShard = checkpointStatuses.poll(); @@ -122,7 +118,7 @@ boolean runMonitorAcknowledgmentLoop(final Consumer stopWorkerC } if (!gaveUpPartition) { - updateOwnershipForAllSegmentPartitions(); + updateOwnershipForAllShardPartitions(); } if (gaveUpPartition || latestCheckpointForShard == null) { @@ -130,7 +126,7 @@ boolean runMonitorAcknowledgmentLoop(final Consumer stopWorkerC } if (latestCheckpointForShard.isFinalAcknowledgmentForPartition()) { - handleCompletedSegment(streamPartition); + handleCompletedShard(streamPartition); } else { streamProgressState.setSequenceNumber(Objects.equals(latestCheckpointForShard.getSequenceNumber(), NULL_SEQUENCE_NUMBER) ? null : latestCheckpointForShard.getSequenceNumber()); sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout()); @@ -151,12 +147,12 @@ public AcknowledgementSet createAcknowledgmentSet( final String sequenceNumberNoNull = sequenceNumber == null ? NULL_SEQUENCE_NUMBER : sequenceNumber; final ShardCheckpointStatus shardCheckpointStatus = new ShardCheckpointStatus(sequenceNumber, Instant.now().toEpochMilli(), isFinalSetForPartition); checkpoints.computeIfAbsent(streamPartition, segment -> new ConcurrentLinkedQueue<>()).add(shardCheckpointStatus); - ackStatus.computeIfAbsent(streamPartition, segment -> new ConcurrentHashMap<>()); - ackStatus.get(streamPartition).put(sequenceNumberNoNull, shardCheckpointStatus); + ackStatuses.computeIfAbsent(streamPartition, segment -> new ConcurrentHashMap<>()); + ackStatuses.get(streamPartition).put(sequenceNumberNoNull, shardCheckpointStatus); return acknowledgementSetManager.create((result) -> { - if (ackStatus.containsKey(streamPartition) && ackStatus.get(streamPartition).containsKey(sequenceNumberNoNull)) { - final ShardCheckpointStatus ackCheckpointStatus = ackStatus.get(streamPartition).get(sequenceNumberNoNull); + if (ackStatuses.containsKey(streamPartition) && ackStatuses.get(streamPartition).containsKey(sequenceNumberNoNull)) { + final ShardCheckpointStatus ackCheckpointStatus = ackStatuses.get(streamPartition).get(sequenceNumberNoNull); ackCheckpointStatus.setAcknowledgedTimestamp(Instant.now().toEpochMilli()); @@ -173,7 +169,7 @@ public AcknowledgementSet createAcknowledgmentSet( }, dynamoDBSourceConfig.getShardAcknowledgmentTimeout()); } - void updateOwnershipForAllSegmentPartitions() { + void updateOwnershipForAllShardPartitions() { if (Duration.between(lastCheckpointTime, Instant.now()).compareTo(CHECKPOINT_INTERVAL) > 0) { for (final StreamPartition streamPartition : checkpoints.keySet()) { if (!partitionsToRemove.contains(streamPartition)) { @@ -196,7 +192,7 @@ private void handleFailure(final StreamPartition streamPartition, sourceCoordinator.giveUpPartition(streamPartition); } - private void handleCompletedSegment(final StreamPartition streamPartition) { + private void handleCompletedShard(final StreamPartition streamPartition) { sourceCoordinator.completePartition(streamPartition); partitionsToRemove.add(streamPartition); LOG.info("Received all acknowledgments for partition {}, marking partition as completed", streamPartition.getPartitionKey()); @@ -206,7 +202,7 @@ public void shutdown() { shutdownTriggered = true; executorService.shutdown(); try { - if (!executorService.awaitTermination(WAIT_FOR_ACKNOWLEDGMENTS_TIMEOUT, TimeUnit.SECONDS)) { + if (!executorService.awaitTermination(WAIT_FOR_ACKNOWLEDGMENTS_TIMEOUT, TimeUnit.MINUTES)) { executorService.shutdownNow(); } } catch (InterruptedException e) { @@ -218,7 +214,7 @@ public void shutdown() { private void removePartitions() { partitionsToRemove.forEach(streamPartition -> { checkpoints.remove(streamPartition); - ackStatus.remove(streamPartition); + ackStatuses.remove(streamPartition); }); partitionsToRemove.clear(); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java index 067b19694c..f795a98183 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java @@ -15,6 +15,7 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter; +import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics; import org.slf4j.Logger; @@ -36,10 +37,6 @@ public class ShardConsumer implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class); - private static final Duration ACKNOWLEDGMENT_EXPIRY_INCREASE_TIME = Duration.ofMinutes(10); - - private static final Duration ACKNOWLEDGMENT_PROGRESS_CHECK_INTERVAL = Duration.ofMinutes(3); - /** * A flag to interrupt the process */ @@ -95,7 +92,7 @@ public class ShardConsumer implements Runnable { private final StreamRecordConverter recordConverter; -// private final StreamCheckpointer checkpointer; + private final StreamCheckpointer checkpointer; private final ShardAcknowledgementManager shardAcknowledgementManager; @@ -109,10 +106,6 @@ public class ShardConsumer implements Runnable { private boolean waitForExport; - private final AcknowledgementSet acknowledgementSet; - - private final Duration shardAcknowledgmentTimeout; - private final String shardId; private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics; @@ -124,7 +117,7 @@ public class ShardConsumer implements Runnable { private ShardConsumer(Builder builder) { this.shardProgress = builder.pluginMetrics.counter(SHARD_PROGRESS); this.dynamoDbStreamsClient = builder.dynamoDbStreamsClient; -// this.checkpointer = builder.checkpointer; + this.checkpointer = builder.checkpointer; this.shardAcknowledgementManager = builder.shardAcknowledgementManager; this.streamPartition = builder.streamPartition; this.shardIterator = builder.shardIterator; @@ -134,8 +127,6 @@ private ShardConsumer(Builder builder) { this.waitForExport = builder.waitForExport; final BufferAccumulator> bufferAccumulator = BufferAccumulator.create(builder.buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); recordConverter = new StreamRecordConverter(bufferAccumulator, builder.tableInfo, builder.pluginMetrics, builder.streamConfig); - this.acknowledgementSet = builder.acknowledgementSet; - this.shardAcknowledgmentTimeout = builder.dataFileAcknowledgmentTimeout; this.shardId = builder.shardId; this.recordsWrittenToBuffer = 0; this.dynamoDBSourceAggregateMetrics = builder.dynamoDBSourceAggregateMetrics; @@ -162,7 +153,7 @@ static class Builder { private TableInfo tableInfo; -// private StreamCheckpointer checkpointer; + private StreamCheckpointer checkpointer; private ShardAcknowledgementManager shardAcknowledgementManager; @@ -178,9 +169,6 @@ static class Builder { private String shardId; - private AcknowledgementSet acknowledgementSet; - private Duration dataFileAcknowledgmentTimeout; - private StreamConfig streamConfig; public Builder(final DynamoDbStreamsClient dynamoDbStreamsClient, @@ -205,10 +193,10 @@ public Builder shardId(final String shardId) { return this; } -// public Builder checkpointer(StreamCheckpointer checkpointer) { -// this.checkpointer = checkpointer; -// return this; -// } + public Builder checkpointer(StreamCheckpointer checkpointer) { + this.checkpointer = checkpointer; + return this; + } public Builder shardAcknowledgementManager(ShardAcknowledgementManager shardAcknowledgementManager) { this.shardAcknowledgementManager = shardAcknowledgementManager; @@ -240,16 +228,6 @@ public Builder waitForExport(boolean waitForExport) { return this; } - public Builder acknowledgmentSet(AcknowledgementSet acknowledgementSet) { - this.acknowledgementSet = acknowledgementSet; - return this; - } - - public Builder acknowledgmentSetTimeout(Duration dataFileAcknowledgmentTimeout) { - this.dataFileAcknowledgmentTimeout = dataFileAcknowledgmentTimeout; - return this; - } - public ShardConsumer build() { return new ShardConsumer(this); } @@ -263,90 +241,88 @@ public void run() { // Check should skip processing or not. if (shouldSkip()) { shardProgress.increment(); - if (acknowledgementSet != null) { - acknowledgementSet.complete(); - } return; } - if (acknowledgementSet != null) { - addProgressCheck(acknowledgementSet); - } - long lastCheckpointTime = System.currentTimeMillis(); String sequenceNumber = ""; int interval; List records; - try { - while (!shouldStop) { - if (shardIterator == null) { - // End of Shard - LOG.debug("Reached end of shard"); - break; - } + while (!shouldStop) { + if (shardIterator == null) { + // End of Shard + LOG.debug("Reached end of shard"); + break; + } - if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) { - LOG.debug("{} records written to buffer for shard {}", recordsWrittenToBuffer, shardId); - lastCheckpointTime = System.currentTimeMillis(); + if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) { + LOG.debug("{} records written to buffer for shard {}", recordsWrittenToBuffer, shardId); + if (shardAcknowledgementManager == null) { + checkpointer.checkpoint(sequenceNumber); } + lastCheckpointTime = System.currentTimeMillis(); + } - GetRecordsResponse response = callGetRecords(shardIterator); - shardIterator = response.nextShardIterator(); - if (!response.records().isEmpty()) { - // Always use the last sequence number for checkpoint - sequenceNumber = response.records().get(response.records().size() - 1).dynamodb().sequenceNumber(); - Instant lastEventTime = response.records().get(response.records().size() - 1).dynamodb().approximateCreationDateTime(); + GetRecordsResponse response = callGetRecords(shardIterator); + shardIterator = response.nextShardIterator(); + if (!response.records().isEmpty()) { + // Always use the last sequence number for checkpoint + sequenceNumber = response.records().get(response.records().size() - 1).dynamodb().sequenceNumber(); + Instant lastEventTime = response.records().get(response.records().size() - 1).dynamodb().approximateCreationDateTime(); - if (lastEventTime.isBefore(startTime)) { - LOG.debug("Get {} events before start time, ignore...", response.records().size()); - continue; - } - if (waitForExport) { - waitForExport(); - waitForExport = false; - } - records = response.records().stream() - .filter(record -> record.dynamodb().approximateCreationDateTime().isAfter(startTime)) - .collect(Collectors.toList()); - recordConverter.writeToBuffer(acknowledgementSet, records); - shardProgress.increment(); - recordsWrittenToBuffer += records.size(); - long delay = System.currentTimeMillis() - lastEventTime.toEpochMilli(); - interval = delay > GET_RECORD_DELAY_THRESHOLD_MILLS ? MINIMUM_GET_RECORD_INTERVAL_MILLS : GET_RECORD_INTERVAL_MILLS; - - } else { - interval = GET_RECORD_INTERVAL_MILLS; - shardProgress.increment(); + if (lastEventTime.isBefore(startTime)) { + LOG.debug("Get {} events before start time, ignore...", response.records().size()); + continue; + } + if (waitForExport) { + waitForExport(); + waitForExport = false; } - try { - // Idle between get records call. - Thread.sleep(interval); - } catch (InterruptedException e) { - throw new RuntimeException(e); + AcknowledgementSet acknowledgementSet = null; + if (shardAcknowledgementManager != null) { + final StreamProgressState lastProgressState = streamPartition.getProgressState().orElseThrow(); + final String endingSequenceNumber = lastProgressState.getEndingSequenceNumber(); + acknowledgementSet = shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, lastProgressState.getSequenceNumber(), endingSequenceNumber == null || endingSequenceNumber.isEmpty()); } - } - // interrupted - if (shouldStop) { - // Do last checkpoint and then quit - LOG.warn("Processing for shard {} was interrupted by a shutdown signal, giving up shard", shardId); - throw new RuntimeException("Consuming shard was interrupted from shutdown"); - } + records = response.records().stream() + .filter(record -> record.dynamodb().approximateCreationDateTime().isAfter(startTime)) + .collect(Collectors.toList()); - if (acknowledgementSet != null) { - acknowledgementSet.complete(); - } + recordConverter.writeToBuffer(acknowledgementSet, records); + if (acknowledgementSet != null) { + acknowledgementSet.complete(); + } - if (waitForExport) { - waitForExport(); + shardProgress.increment(); + recordsWrittenToBuffer += records.size(); + long delay = System.currentTimeMillis() - lastEventTime.toEpochMilli(); + interval = delay > GET_RECORD_DELAY_THRESHOLD_MILLS ? MINIMUM_GET_RECORD_INTERVAL_MILLS : GET_RECORD_INTERVAL_MILLS; + + } else { + interval = GET_RECORD_INTERVAL_MILLS; + shardProgress.increment(); } - } catch (final Exception exc) { - if (acknowledgementSet != null) { - acknowledgementSet.cancel(); + + try { + // Idle between get records call. + Thread.sleep(interval); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - throw exc; + } + + // interrupted + if (shouldStop) { + // Do last checkpoint and then quit + LOG.warn("Processing for shard {} was interrupted by a shutdown signal, giving up shard", shardId); + throw new RuntimeException("Consuming shard was interrupted from shutdown"); + } + + if (waitForExport) { + waitForExport(); } } @@ -377,7 +353,7 @@ private GetRecordsResponse callGetRecords(String shardIterator) { private void waitForExport() { LOG.debug("Start waiting for export to be done and loaded"); int numberOfWaits = 0; - while (!shardAcknowledgementManager.isExportDone(streamPartition)) { + while (!checkpointer.isExportDone()) { LOG.debug("Export is in progress, wait..."); try { shardProgress.increment(); @@ -388,6 +364,9 @@ private void waitForExport() { numberOfWaits++; if (numberOfWaits % DEFAULT_WAIT_COUNT_TO_CHECKPOINT == 0) { // To extend the timeout of lease + if (shardAcknowledgementManager == null) { + checkpointer.checkpoint(null); + } } } catch (InterruptedException e) { LOG.error("Wait for export is interrupted ({})", e.getMessage()); @@ -433,10 +412,4 @@ public static void stopAll() { shouldStop = true; } - private void addProgressCheck(final AcknowledgementSet acknowledgementSet) { - acknowledgementSet.addProgressCheck( - (ignored) -> { - acknowledgementSet.increaseExpiry(ACKNOWLEDGMENT_EXPIRY_INCREASE_TIME); - }, ACKNOWLEDGMENT_PROGRESS_CHECK_INTERVAL); - } } \ No newline at end of file diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java index 2a2df78141..eb101a5638 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java @@ -6,7 +6,6 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.stream; import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; @@ -64,7 +63,6 @@ public ShardConsumerFactory(final EnhancedSourceCoordinator enhancedSourceCoordi } public Runnable createConsumer(final StreamPartition streamPartition, - final AcknowledgementSet acknowledgementSet, final Duration shardAcknowledgmentTimeout, final ShardAcknowledgementManager shardAcknowledgementManager) { @@ -77,8 +75,7 @@ public Runnable createConsumer(final StreamPartition streamPartition, Instant startTime = null; boolean waitForExport = false; if (progressState.isPresent()) { - // We can't checkpoint with acks yet - sequenceNumber = acknowledgementSet == null ? null : progressState.get().getSequenceNumber(); + sequenceNumber = shardAcknowledgementManager == null ? null : progressState.get().getSequenceNumber(); waitForExport = progressState.get().shouldWaitForExport(); if (progressState.get().getStartTime() != 0) { startTime = Instant.ofEpochMilli(progressState.get().getStartTime()); @@ -96,7 +93,7 @@ public Runnable createConsumer(final StreamPartition streamPartition, return null; } - + StreamCheckpointer checkpointer = new StreamCheckpointer(enhancedSourceCoordinator, streamPartition); String tableArn = TableUtil.getTableArnFromStreamArn(streamPartition.getStreamArn()); TableInfo tableInfo = getTableInfo(tableArn); @@ -104,6 +101,7 @@ public Runnable createConsumer(final StreamPartition streamPartition, LOG.debug("Create shard consumer for {} with lastShardIter {}", streamPartition.getShardId(), lastShardIterator); ShardConsumer shardConsumer = ShardConsumer.builder(streamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig) .tableInfo(tableInfo) + .checkpointer(checkpointer) .shardAcknowledgementManager(shardAcknowledgementManager) .streamPartition(streamPartition) .shardIterator(shardIterator) @@ -111,8 +109,6 @@ public Runnable createConsumer(final StreamPartition streamPartition, .lastShardIterator(lastShardIterator) .startTime(startTime) .waitForExport(waitForExport) - .acknowledgmentSet(acknowledgementSet) - .acknowledgmentSetTimeout(shardAcknowledgmentTimeout) .build(); return shardConsumer; } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java index 550166590a..f0c6a6f46d 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java @@ -6,13 +6,11 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.stream; import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; import org.opensearch.dataprepper.plugins.source.dynamodb.utils.BackoffCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,10 +47,8 @@ public class StreamScheduler implements Runnable { private final EnhancedSourceCoordinator coordinator; private final ShardConsumerFactory consumerFactory; private final ExecutorService executor; - private final PluginMetrics pluginMetrics; private final AtomicLong activeChangeEventConsumers; private final AtomicLong shardsInProcessing; - private final AcknowledgementSetManager acknowledgementSetManager; private final DynamoDBSourceConfig dynamoDBSourceConfig; private final BackoffCalculator backoffCalculator; private int noAvailableShardsCount = 0; @@ -68,12 +64,12 @@ public StreamScheduler(final EnhancedSourceCoordinator coordinator, final BackoffCalculator backoffCalculator) { this.coordinator = coordinator; this.consumerFactory = consumerFactory; - this.pluginMetrics = pluginMetrics; - this.acknowledgementSetManager = acknowledgementSetManager; this.dynamoDBSourceConfig = dynamoDBSourceConfig; this.backoffCalculator = backoffCalculator; - this.shardAcknowledgementManager = new ShardAcknowledgementManager(acknowledgementSetManager, coordinator, dynamoDBSourceConfig); - this.shardAcknowledgementManager.init(coordinator::giveUpPartition); + this.shardAcknowledgementManager = dynamoDBSourceConfig.isAcknowledgmentsEnabled() ? new ShardAcknowledgementManager(acknowledgementSetManager, coordinator, dynamoDBSourceConfig) : null; + if (this.shardAcknowledgementManager != null) { + this.shardAcknowledgementManager.init(coordinator::giveUpPartition); + } executor = Executors.newFixedThreadPool(MAX_JOB_COUNT); activeChangeEventConsumers = pluginMetrics.gauge(ACTIVE_CHANGE_EVENT_CONSUMERS, new AtomicLong()); @@ -81,15 +77,8 @@ public StreamScheduler(final EnhancedSourceCoordinator coordinator, } private void processStreamPartition(StreamPartition streamPartition) { - final boolean acknowledgmentsEnabled = dynamoDBSourceConfig.isAcknowledgmentsEnabled(); - AcknowledgementSet acknowledgementSet = null; - if (acknowledgmentsEnabled) { - final StreamProgressState lastProgressState = streamPartition.getProgressState().orElseThrow(); - final String endingSequenceNumber = lastProgressState.getEndingSequenceNumber(); - acknowledgementSet = shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, lastProgressState.getSequenceNumber(), endingSequenceNumber == null || endingSequenceNumber.isEmpty()); - } - Runnable shardConsumer = consumerFactory.createConsumer(streamPartition, acknowledgementSet, dynamoDBSourceConfig.getShardAcknowledgmentTimeout(), shardAcknowledgementManager); + Runnable shardConsumer = consumerFactory.createConsumer(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout(), shardAcknowledgementManager); if (shardConsumer != null) { CompletableFuture runConsumer = CompletableFuture.runAsync(shardConsumer, executor); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java index 8aff55a48a..d9627ad329 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java @@ -28,6 +28,7 @@ import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; +import java.time.Duration; import java.time.Instant; import java.util.Optional; import java.util.UUID; @@ -117,7 +118,7 @@ public void test_create_shardConsumer_correctly() { ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig); ShardAcknowledgementManager shardAcknowledgementManager = mock(ShardAcknowledgementManager.class); - Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null, shardAcknowledgementManager); + Runnable consumer = consumerFactory.createConsumer(streamPartition, Duration.ofMinutes(1), shardAcknowledgementManager); assertThat(consumer, notNullValue()); verify(dynamoDbStreamsClient).getShardIterator(any(GetShardIteratorRequest.class)); @@ -135,7 +136,7 @@ public void test_create_shardConsumer_for_closedShards() { ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig); ShardAcknowledgementManager shardAcknowledgementManager = mock(ShardAcknowledgementManager.class); - Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null, shardAcknowledgementManager); + Runnable consumer = consumerFactory.createConsumer(streamPartition, Duration.ofMinutes(1), shardAcknowledgementManager); assertThat(consumer, notNullValue()); // Should get iterators twice verify(dynamoDbStreamsClient, times(2)).getShardIterator(any(GetShardIteratorRequest.class)); @@ -157,7 +158,7 @@ void stream5xxErrors_is_incremented_when_get_shard_iterator_throws_internal_exce ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig); ShardAcknowledgementManager shardAcknowledgementManager = mock(ShardAcknowledgementManager.class); - Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null, shardAcknowledgementManager); + Runnable consumer = consumerFactory.createConsumer(streamPartition, Duration.ofMinutes(1), shardAcknowledgementManager); assertThat(consumer, nullValue()); verify(stream5xxErrors).increment(); verify(streamApiInvocations).increment(); @@ -176,7 +177,7 @@ void stream4xxErrors_is_incremented_when_get_shard_iterator_throws_dynamodb_exce ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig); ShardAcknowledgementManager shardAcknowledgementManager = mock(ShardAcknowledgementManager.class); - Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null, shardAcknowledgementManager); + Runnable consumer = consumerFactory.createConsumer(streamPartition, Duration.ofMinutes(1), shardAcknowledgementManager); assertThat(consumer, nullValue()); verify(stream4xxErrors).increment(); verify(streamApiInvocations).increment(); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java index e5af69c08d..598ae6e5d0 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java @@ -10,14 +10,12 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; -import org.opensearch.dataprepper.model.acknowledgements.ProgressCheck; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; @@ -47,7 +45,6 @@ import java.util.Optional; import java.util.Random; import java.util.UUID; -import java.util.function.Consumer; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -178,153 +175,174 @@ void setup() throws Exception { @Test void test_run_shardConsumer_correctly() throws Exception { - ShardConsumer shardConsumer; - try ( - final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class) - ) { - bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); - shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig) - .shardIterator(shardIterator) - .shardAcknowledgementManager(shardAcknowledgementManager) - .streamPartition(streamPartition) - .tableInfo(tableInfo) - .startTime(null) - .waitForExport(false) - .build(); + // Disable the static shouldStop flag to prevent early exit + try (MockedStatic shardConsumerMockedStatic = mockStatic(ShardConsumer.class, invocation -> { + if (invocation.getMethod().getName().equals("stopAll")) { + return null; + } else if (invocation.getMethod().getName().equals("shouldStop")) { + return false; + } + return invocation.callRealMethod(); + })) { + ShardConsumer shardConsumer; + try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { + bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); + shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig) + .shardIterator(shardIterator) + .shardAcknowledgementManager(shardAcknowledgementManager) + .streamPartition(streamPartition) + .tableInfo(tableInfo) + .startTime(null) + .waitForExport(false) + .build(); + } + + shardConsumer.run(); + + verify(dynamoDbStreamsClient).getRecords(any(GetRecordsRequest.class)); + verify(bufferAccumulator, times(total)).add(any(org.opensearch.dataprepper.model.record.Record.class)); + verify(bufferAccumulator).flush(); + verify(streamApiInvocations).increment(); + verify(shardProgress).increment(); } - - shardConsumer.run(); - - verify(dynamoDbStreamsClient).getRecords(any(GetRecordsRequest.class)); - verify(bufferAccumulator, times(total)).add(any(org.opensearch.dataprepper.model.record.Record.class)); - verify(bufferAccumulator).flush(); - verify(streamApiInvocations).increment(); - verify(shardProgress).increment(); } @Test void test_run_shardConsumer_with_acknowledgments_correctly() throws Exception { final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); - final Duration acknowledgmentTimeout = Duration.ofSeconds(30); - - ShardConsumer shardConsumer; - try ( - final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class) - ) { - bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); - shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig) - .shardIterator(shardIterator) - .shardAcknowledgementManager(shardAcknowledgementManager) - .streamPartition(streamPartition) - .tableInfo(tableInfo) - .startTime(null) - .acknowledgmentSetTimeout(acknowledgmentTimeout) - .acknowledgmentSet(acknowledgementSet) - .waitForExport(false) - .build(); + + // Mock the shardAcknowledgementManager to return our mock acknowledgementSet + lenient().when(shardAcknowledgementManager.createAcknowledgmentSet(any(StreamPartition.class), any(String.class), any(Boolean.class))) + .thenReturn(acknowledgementSet); + + // Disable the static shouldStop flag to prevent early exit + try (MockedStatic shardConsumerMockedStatic = mockStatic(ShardConsumer.class, invocation -> { + if (invocation.getMethod().getName().equals("stopAll")) { + return null; + } else if (invocation.getMethod().getName().equals("shouldStop")) { + return false; + } + return invocation.callRealMethod(); + })) { + ShardConsumer shardConsumer; + try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { + bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); + shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig) + .shardIterator(shardIterator) + .shardAcknowledgementManager(shardAcknowledgementManager) + .streamPartition(streamPartition) + .tableInfo(tableInfo) + .startTime(null) + .waitForExport(false) + .build(); + } + + shardConsumer.run(); + + verify(dynamoDbStreamsClient).getRecords(any(GetRecordsRequest.class)); + verify(bufferAccumulator, times(total)).add(any(org.opensearch.dataprepper.model.record.Record.class)); + verify(bufferAccumulator).flush(); + verify(streamApiInvocations).increment(); + verify(shardProgress).increment(); } - - shardConsumer.run(); - - final ArgumentCaptor progressCheckConsumerArgumentCaptor = ArgumentCaptor.forClass(Consumer.class); - verify(acknowledgementSet).addProgressCheck(progressCheckConsumerArgumentCaptor.capture(), any(Duration.class)); - - final Consumer progressCheckConsumer = progressCheckConsumerArgumentCaptor.getValue(); - progressCheckConsumer.accept(mock(ProgressCheck.class)); - - verify(acknowledgementSet).increaseExpiry(any(Duration.class)); - verify(dynamoDbStreamsClient).getRecords(any(GetRecordsRequest.class)); - verify(bufferAccumulator, times(total)).add(any(org.opensearch.dataprepper.model.record.Record.class)); - verify(bufferAccumulator).flush(); - verify(acknowledgementSet).complete(); - verify(streamApiInvocations).increment(); - verify(shardProgress).increment(); } @Test void test_run_shardConsumer_with_acknowledgments_and_error_cancels_acknowledgment_set() throws Exception { - final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); - final Duration acknowledgmentTimeout = Duration.ofSeconds(30); - when(dynamoDbStreamsClient.getRecords(any(GetRecordsRequest.class))).thenThrow(InternalServerErrorException.class); + when(aggregateMetrics.getStream5xxErrors()).thenReturn(stream5xxErrors); - ShardConsumer shardConsumer; - try ( - final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class) - ) { - bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); - shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig) - .shardIterator(shardIterator) - .shardAcknowledgementManager(shardAcknowledgementManager) - .streamPartition(streamPartition) - .tableInfo(tableInfo) - .startTime(null) - .acknowledgmentSetTimeout(acknowledgmentTimeout) - .acknowledgmentSet(acknowledgementSet) - .waitForExport(false) - .build(); + // Disable the static shouldStop flag to prevent early exit + try (MockedStatic shardConsumerMockedStatic = mockStatic(ShardConsumer.class, invocation -> { + if (invocation.getMethod().getName().equals("stopAll")) { + return null; + } + return invocation.callRealMethod(); + })) { + ShardConsumer shardConsumer; + try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { + bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); + shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig) + .shardIterator(shardIterator) + .shardAcknowledgementManager(shardAcknowledgementManager) + .streamPartition(streamPartition) + .tableInfo(tableInfo) + .startTime(null) + .waitForExport(false) + .build(); + } + + assertThrows(RuntimeException.class, shardConsumer::run); + + verify(stream5xxErrors).increment(); + verify(streamApiInvocations).increment(); } - - assertThrows(RuntimeException.class, shardConsumer::run); - - final ArgumentCaptor progressCheckConsumerArgumentCaptor = ArgumentCaptor.forClass(Consumer.class); - verify(acknowledgementSet).addProgressCheck(progressCheckConsumerArgumentCaptor.capture(), any(Duration.class)); - - final Consumer progressCheckConsumer = progressCheckConsumerArgumentCaptor.getValue(); - progressCheckConsumer.accept(mock(ProgressCheck.class)); - - verify(acknowledgementSet).increaseExpiry(any(Duration.class)); - verify(acknowledgementSet).cancel(); } @Test void test_run_shardConsumer_catches_5xx_exception_and_increments_metric() { - ShardConsumer shardConsumer; - try ( - final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { - bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); - shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig) - .shardIterator(shardIterator) - .shardAcknowledgementManager(shardAcknowledgementManager) - .streamPartition(streamPartition) - .tableInfo(tableInfo) - .startTime(null) - .waitForExport(false) - .build(); - } - + // First set up the mocks for the exception case when(dynamoDbStreamsClient.getRecords(any(GetRecordsRequest.class))).thenThrow(InternalServerErrorException.class); when(aggregateMetrics.getStream5xxErrors()).thenReturn(stream5xxErrors); - - assertThrows(RuntimeException.class, shardConsumer::run); - - verify(stream5xxErrors).increment(); - verify(streamApiInvocations).increment(); + + // Disable the static shouldStop flag to prevent early exit + try (MockedStatic shardConsumerMockedStatic = mockStatic(ShardConsumer.class, invocation -> { + if (invocation.getMethod().getName().equals("stopAll")) { + return null; + } + return invocation.callRealMethod(); + })) { + ShardConsumer shardConsumer; + try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { + bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); + shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig) + .shardIterator(shardIterator) + .shardAcknowledgementManager(shardAcknowledgementManager) + .streamPartition(streamPartition) + .tableInfo(tableInfo) + .startTime(null) + .waitForExport(false) + .build(); + } + + assertThrows(RuntimeException.class, shardConsumer::run); + + verify(stream5xxErrors).increment(); + verify(streamApiInvocations).increment(); + } } @Test void test_run_shardConsumer_catches_4xx_exception_and_increments_metric() { - ShardConsumer shardConsumer; - try ( - final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { - bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); - shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig) - .shardIterator(shardIterator) - .shardAcknowledgementManager(shardAcknowledgementManager) - .streamPartition(streamPartition) - .tableInfo(tableInfo) - .startTime(null) - .waitForExport(false) - .build(); - } - + // First set up the mocks for the exception case when(dynamoDbStreamsClient.getRecords(any(GetRecordsRequest.class))).thenThrow(DynamoDbException.class); when(aggregateMetrics.getStream4xxErrors()).thenReturn(stream4xxErrors); - - assertThrows(RuntimeException.class, shardConsumer::run); - - verify(stream4xxErrors).increment(); - verify(streamApiInvocations).increment(); + + // Disable the static shouldStop flag to prevent early exit + try (MockedStatic shardConsumerMockedStatic = mockStatic(ShardConsumer.class, invocation -> { + if (invocation.getMethod().getName().equals("stopAll")) { + return null; + } + return invocation.callRealMethod(); + })) { + ShardConsumer shardConsumer; + try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { + bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); + shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig) + .shardIterator(shardIterator) + .shardAcknowledgementManager(shardAcknowledgementManager) + .streamPartition(streamPartition) + .tableInfo(tableInfo) + .startTime(null) + .waitForExport(false) + .build(); + } + + assertThrows(RuntimeException.class, shardConsumer::run); + + verify(stream4xxErrors).increment(); + verify(streamApiInvocations).increment(); + } } private List buildRecords(int count) { diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java index 8e85da2dcd..d1ff6d382d 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java @@ -126,7 +126,13 @@ public void test_normal_run() throws InterruptedException { when(backoffCalculator.calculateBackoffToAcquireNextShard(eq(1), any(AtomicInteger.class))) .thenReturn(10000L); - when(consumerFactory.createConsumer(any(StreamPartition.class), eq(null), any(Duration.class), any(ShardAcknowledgementManager.class))).thenReturn(() -> LOG.info("Hello")); + // Set up the mock for getShardAcknowledgmentTimeout + Duration timeout = Duration.ofMinutes(1); + when(dynamoDBSourceConfig.getShardAcknowledgmentTimeout()).thenReturn(timeout); + when(dynamoDBSourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); + + // Set up the mock for createConsumer with the specific timeout + when(consumerFactory.createConsumer(any(StreamPartition.class), eq(timeout), eq(null))).thenReturn(() -> LOG.info("Hello")); when(coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).thenReturn(Optional.of(streamPartition)).thenReturn(Optional.empty()); scheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, backoffCalculator); @@ -140,8 +146,8 @@ public void test_normal_run() throws InterruptedException { // Should acquire the stream partition verify(coordinator).acquireAvailablePartition(StreamPartition.PARTITION_TYPE); - // Should start a new consumer - verify(consumerFactory).createConsumer(any(StreamPartition.class), eq(null), any(Duration.class), any(ShardAcknowledgementManager.class)); + // Should start a new consumer with the specific timeout + verify(consumerFactory).createConsumer(any(StreamPartition.class), eq(timeout), eq(null)); // Should mask the stream partition as completed. verify(coordinator).completePartition(any(StreamPartition.class)); @@ -173,7 +179,7 @@ public void test_normal_run_with_acknowledgments() throws InterruptedException { return acknowledgementSet; }).when(acknowledgementSetManager).create(any(Consumer.class), eq(shardAcknowledgmentTimeout)); - when(consumerFactory.createConsumer(any(StreamPartition.class), eq(acknowledgementSet), eq(shardAcknowledgmentTimeout), any(ShardAcknowledgementManager.class))).thenReturn(() -> LOG.info("Hello")); + when(consumerFactory.createConsumer(any(StreamPartition.class), eq(shardAcknowledgmentTimeout), any(ShardAcknowledgementManager.class))).thenReturn(() -> LOG.info("Hello")); scheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, backoffCalculator); @@ -188,10 +194,7 @@ public void test_normal_run_with_acknowledgments() throws InterruptedException { // Should acquire the stream partition verify(coordinator).acquireAvailablePartition(StreamPartition.PARTITION_TYPE); // Should start a new consumer - verify(consumerFactory).createConsumer(any(StreamPartition.class), any(AcknowledgementSet.class), any(Duration.class), any(ShardAcknowledgementManager.class)); - - // Should mask the stream partition as completed. - verify(coordinator).completePartition(any(StreamPartition.class)); + verify(consumerFactory).createConsumer(any(StreamPartition.class), any(Duration.class), any(ShardAcknowledgementManager.class)); verify(activeShardsInProcessing).incrementAndGet(); verify(activeShardsInProcessing).decrementAndGet(); From 8380392e534a4f674455fde4f57d9d911aa94442 Mon Sep 17 00:00:00 2001 From: Jonah Calvo Date: Fri, 18 Jul 2025 11:46:59 -0500 Subject: [PATCH 04/14] Minor changes to respond to PR feedback Signed-off-by: Jonah Calvo --- .../plugins/source/dynamodb/DynamoDBSourceConfig.java | 2 +- .../dynamodb/stream/ShardAcknowledgementManager.java | 8 ++++---- .../plugins/source/dynamodb/stream/StreamScheduler.java | 6 ++---- .../dynamodb/stream/ShardAcknowledgementManagerTest.java | 2 +- 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java index 9d6d3fd358..1f8ace0b54 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java @@ -37,7 +37,7 @@ public class DynamoDBSourceConfig { private boolean acknowledgments = false; @JsonProperty("shard_acknowledgment_timeout") - private Duration shardAcknowledgmentTimeout = Duration.ofMinutes(10); + private Duration shardAcknowledgmentTimeout = Duration.ofMinutes(30); @JsonProperty("s3_data_file_acknowledgment_timeout") private Duration dataFileAcknowledgmentTimeout = Duration.ofMinutes(15); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java index c86f9333e4..2f737f284a 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java @@ -54,7 +54,8 @@ public class ShardAcknowledgementManager { public ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager, final EnhancedSourceCoordinator sourceCoordinator, - final DynamoDBSourceConfig dynamoDBSourceConfig + final DynamoDBSourceConfig dynamoDBSourceConfig, + final Consumer stopWorkerConsumer ) { this.acknowledgementSetManager = acknowledgementSetManager; this.sourceCoordinator = sourceCoordinator; @@ -62,9 +63,8 @@ public ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgeme this.executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("dynamodb-shard-ack-monitor")); this.partitionsToRemove = Collections.synchronizedList(new ArrayList<>()); this.lastCheckpointTime = Instant.now(); - } - - public void init(final Consumer stopWorkerConsumer) { + + // Start monitoring acknowledgments in the constructor executorService.submit(() -> monitorAcknowledgments(stopWorkerConsumer)); } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java index f0c6a6f46d..7bcce86ec5 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java @@ -66,10 +66,8 @@ public StreamScheduler(final EnhancedSourceCoordinator coordinator, this.consumerFactory = consumerFactory; this.dynamoDBSourceConfig = dynamoDBSourceConfig; this.backoffCalculator = backoffCalculator; - this.shardAcknowledgementManager = dynamoDBSourceConfig.isAcknowledgmentsEnabled() ? new ShardAcknowledgementManager(acknowledgementSetManager, coordinator, dynamoDBSourceConfig) : null; - if (this.shardAcknowledgementManager != null) { - this.shardAcknowledgementManager.init(coordinator::giveUpPartition); - } + this.shardAcknowledgementManager = dynamoDBSourceConfig.isAcknowledgmentsEnabled() ? + new ShardAcknowledgementManager(acknowledgementSetManager, coordinator, dynamoDBSourceConfig, coordinator::giveUpPartition) : null; executor = Executors.newFixedThreadPool(MAX_JOB_COUNT); activeChangeEventConsumers = pluginMetrics.gauge(ACTIVE_CHANGE_EVENT_CONSUMERS, new AtomicLong()); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java index de2f7d316e..f4a8b3b740 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java @@ -53,7 +53,7 @@ class ShardAcknowledgementManagerTest { @BeforeEach void setUp() { shardAcknowledgementManager = new ShardAcknowledgementManager( - acknowledgementSetManager, sourceCoordinator, dynamoDBSourceConfig); + acknowledgementSetManager, sourceCoordinator, dynamoDBSourceConfig, stopWorkerConsumer); } @Test From 9980e8024a58c888958e9b71b8f79a3299f2332a Mon Sep 17 00:00:00 2001 From: Jonah Calvo Date: Mon, 21 Jul 2025 11:46:51 -0500 Subject: [PATCH 05/14] Better handling of failures during DDB acknowledgements Signed-off-by: Jonah Calvo --- .../source/dynamodb/stream/ShardAcknowledgementManager.java | 6 ++++++ .../plugins/source/dynamodb/stream/ShardConsumer.java | 6 ++++-- .../plugins/source/dynamodb/stream/StreamCheckpointer.java | 4 ++-- .../plugins/source/dynamodb/stream/StreamScheduler.java | 1 + 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java index 2f737f284a..20933679d2 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java @@ -220,6 +220,12 @@ private void removePartitions() { partitionsToRemove.clear(); } + public void giveUpPartition(final StreamPartition streamPartition) { + sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout()); + checkpoints.remove(streamPartition); + ackStatuses.remove(streamPartition); + } + public boolean isExportDone(StreamPartition streamPartition) { Optional globalPartition = sourceCoordinator.getPartition(streamPartition.getStreamArn()); return globalPartition.isPresent(); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java index f795a98183..fd04b84aed 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java @@ -241,6 +241,9 @@ public void run() { // Check should skip processing or not. if (shouldSkip()) { shardProgress.increment(); + if (shardAcknowledgementManager != null) { + checkpointer.completePartition(); + } return; } @@ -283,8 +286,7 @@ public void run() { AcknowledgementSet acknowledgementSet = null; if (shardAcknowledgementManager != null) { final StreamProgressState lastProgressState = streamPartition.getProgressState().orElseThrow(); - final String endingSequenceNumber = lastProgressState.getEndingSequenceNumber(); - acknowledgementSet = shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, lastProgressState.getSequenceNumber(), endingSequenceNumber == null || endingSequenceNumber.isEmpty()); + acknowledgementSet = shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, lastProgressState.getSequenceNumber(), shardIterator == null); } records = response.records().stream() diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamCheckpointer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamCheckpointer.java index e7ba18688d..62d5559295 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamCheckpointer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamCheckpointer.java @@ -62,7 +62,7 @@ public boolean isExportDone() { return globalPartition.isPresent(); } - public void updateShardForAcknowledgmentWait(final Duration acknowledgmentSetTimeout) { - coordinator.saveProgressStateForPartition(streamPartition, acknowledgmentSetTimeout); + public void completePartition() { + coordinator.completePartition(streamPartition); } } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java index 7bcce86ec5..e3970c6e94 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java @@ -151,6 +151,7 @@ private BiConsumer completeConsumer(StreamPartition streamPartition) { } } else { LOG.error("Received an exception while processing shard {}, giving up shard: {}", streamPartition.getShardId(), ex); + shardAcknowledgementManager.giveUpPartition(streamPartition); coordinator.giveUpPartition(streamPartition); } }; From ea8e1b58a1128cfffa49967e2558e1d999820ed8 Mon Sep 17 00:00:00 2001 From: Jonah Calvo Date: Tue, 22 Jul 2025 16:40:21 -0500 Subject: [PATCH 06/14] Update tracking of partitions to give up Signed-off-by: Jonah Calvo --- .../stream/ShardAcknowledgementManager.java | 16 +++++++++++++--- .../source/dynamodb/stream/StreamScheduler.java | 7 +++++-- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java index 20933679d2..8a23228957 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java @@ -48,6 +48,7 @@ public class ShardAcknowledgementManager { private final ExecutorService executorService; private final List partitionsToRemove; + private final List partitionsToGiveUp; private boolean shutdownTriggered; private Instant lastCheckpointTime; @@ -62,6 +63,7 @@ public ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgeme this.dynamoDBSourceConfig = dynamoDBSourceConfig; this.executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("dynamodb-shard-ack-monitor")); this.partitionsToRemove = Collections.synchronizedList(new ArrayList<>()); + this.partitionsToGiveUp = Collections.synchronizedList(new ArrayList<>()); this.lastCheckpointTime = Instant.now(); // Start monitoring acknowledgments in the constructor @@ -132,6 +134,11 @@ boolean runMonitorAcknowledgmentLoop(final Consumer stopWorkerC sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout()); LOG.debug("Checkpointed shard {} with latest sequence number acknowledged {}", streamPartition.getShardId(), latestCheckpointForShard.getSequenceNumber()); } + if (partitionsToGiveUp.contains(streamPartition)) { + partitionsToRemove.add(streamPartition); + sourceCoordinator.giveUpPartition(streamPartition); + } + } catch (final Exception e) { LOG.error("Received exception while monitoring acknowledgments for stream partition {}", streamPartition.getPartitionKey(), e); } @@ -190,11 +197,13 @@ private void handleFailure(final StreamPartition streamPartition, } partitionsToRemove.add(streamPartition); sourceCoordinator.giveUpPartition(streamPartition); + partitionsToGiveUp.remove(streamPartition); } private void handleCompletedShard(final StreamPartition streamPartition) { sourceCoordinator.completePartition(streamPartition); partitionsToRemove.add(streamPartition); + partitionsToGiveUp.remove(streamPartition); LOG.info("Received all acknowledgments for partition {}, marking partition as completed", streamPartition.getPartitionKey()); } @@ -221,9 +230,10 @@ private void removePartitions() { } public void giveUpPartition(final StreamPartition streamPartition) { - sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout()); - checkpoints.remove(streamPartition); - ackStatuses.remove(streamPartition); + if (!partitionsToGiveUp.contains(streamPartition)) { + LOG.debug("Adding partition {} to give up list", streamPartition.getPartitionKey()); + partitionsToGiveUp.add(streamPartition); + } } public boolean isExportDone(StreamPartition streamPartition) { diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java index e3970c6e94..1297b487eb 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java @@ -151,8 +151,11 @@ private BiConsumer completeConsumer(StreamPartition streamPartition) { } } else { LOG.error("Received an exception while processing shard {}, giving up shard: {}", streamPartition.getShardId(), ex); - shardAcknowledgementManager.giveUpPartition(streamPartition); - coordinator.giveUpPartition(streamPartition); + if (dynamoDBSourceConfig.isAcknowledgmentsEnabled()) { + shardAcknowledgementManager.giveUpPartition(streamPartition); + } else { + coordinator.giveUpPartition(streamPartition); + } } }; } From f853006150cce7d7bd1722500b2934113c72af6c Mon Sep 17 00:00:00 2001 From: Jonah Calvo Date: Wed, 23 Jul 2025 13:19:25 -0500 Subject: [PATCH 07/14] Add license headers Signed-off-by: Jonah Calvo --- .../dynamodb/model/ShardCheckpointStatus.java | 13 +++++-------- .../stream/ShardAcknowledgementManager.java | 11 ++++++++++- .../stream/ShardAcknowledgementManagerTest.java | 10 ++++++++++ 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/ShardCheckpointStatus.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/ShardCheckpointStatus.java index d8b0208788..b9489fbcef 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/ShardCheckpointStatus.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/ShardCheckpointStatus.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.source.dynamodb.model; @@ -49,14 +54,6 @@ public boolean isNegativeAcknowledgement() { return this.acknowledgeStatus == AcknowledgmentStatus.NEGATIVE_ACK; } - public long getCreateTimestamp() { - return createTimestamp; - } - - public long getAcknowledgedTimestamp() { - return acknowledgedTimestamp; - } - public boolean isFinalAcknowledgmentForPartition() { return isFinalAcknowledgmentForPartition; } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java index 8a23228957..3a205624b5 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java @@ -1,3 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + package org.opensearch.dataprepper.plugins.source.dynamodb.stream; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; @@ -66,7 +76,6 @@ public ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgeme this.partitionsToGiveUp = Collections.synchronizedList(new ArrayList<>()); this.lastCheckpointTime = Instant.now(); - // Start monitoring acknowledgments in the constructor executorService.submit(() -> monitorAcknowledgments(stopWorkerConsumer)); } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java index f4a8b3b740..988169cba4 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java @@ -1,3 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + package org.opensearch.dataprepper.plugins.source.dynamodb.stream; import org.junit.jupiter.api.BeforeEach; From 8fe06adbbe50238bac58c8516aabd7fd70d9693c Mon Sep 17 00:00:00 2001 From: Jonah Calvo Date: Fri, 25 Jul 2025 15:56:11 -0500 Subject: [PATCH 08/14] Fix edge case where no records are received after initialization Signed-off-by: Jonah Calvo --- .../source/dynamodb/stream/ShardAcknowledgementManager.java | 4 ++++ .../plugins/source/dynamodb/stream/ShardConsumer.java | 2 ++ 2 files changed, 6 insertions(+) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java index 3a205624b5..0c231332e0 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java @@ -249,4 +249,8 @@ public boolean isExportDone(StreamPartition streamPartition) { Optional globalPartition = sourceCoordinator.getPartition(streamPartition.getStreamArn()); return globalPartition.isPresent(); } + + public void startUpdatingOwnershipForShard(final StreamPartition streamPartition) { + checkpoints.computeIfAbsent(streamPartition, segment -> new ConcurrentLinkedQueue<>()); + } } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java index fd04b84aed..c33ffdc2d7 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java @@ -247,6 +247,8 @@ public void run() { return; } + shardAcknowledgementManager.startUpdatingOwnershipForShard(streamPartition); + long lastCheckpointTime = System.currentTimeMillis(); String sequenceNumber = ""; int interval; From ccbe7f8f7ed3754c6cf1f830c88ff6ceea7bac90 Mon Sep 17 00:00:00 2001 From: Jonah Calvo Date: Mon, 28 Jul 2025 10:33:29 -0500 Subject: [PATCH 09/14] Updated unit tests Signed-off-by: Jonah Calvo --- .../ShardAcknowledgementManagerTest.java | 22 +++++++++++++++++++ .../dynamodb/stream/ShardConsumerTest.java | 20 +++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java index 988169cba4..e182c44fcd 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java @@ -22,7 +22,9 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; +import java.lang.reflect.Field; import java.time.Duration; +import java.time.Instant; import java.util.Optional; import java.util.function.Consumer; @@ -93,4 +95,24 @@ void testIsExportDone() { void testShutdown() { assertDoesNotThrow(() -> shardAcknowledgementManager.shutdown()); } + + @Test + void testUpdateOwnershipForAllShardPartitions() throws Exception { + when(dynamoDBSourceConfig.getShardAcknowledgmentTimeout()).thenReturn(Duration.ofMinutes(15)); + when(acknowledgementSetManager.create(any(Consumer.class), any(Duration.class))).thenReturn(acknowledgementSet); + + // Create acknowledgment set to add partition to checkpoints + shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, "seq123", false); + + // Set lastCheckpointTime to past to trigger checkpoint interval + Field lastCheckpointTimeField = ShardAcknowledgementManager.class.getDeclaredField("lastCheckpointTime"); + lastCheckpointTimeField.setAccessible(true); + lastCheckpointTimeField.set(shardAcknowledgementManager, Instant.now().minus(Duration.ofMinutes(5))); + + // Call updateOwnershipForAllShardPartitions directly + shardAcknowledgementManager.updateOwnershipForAllShardPartitions(); + + // Verify that saveProgressStateForPartition is called + verify(sourceCoordinator).saveProgressStateForPartition(eq(streamPartition), any(Duration.class)); + } } \ No newline at end of file diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java index 598ae6e5d0..860b618f1d 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java @@ -345,6 +345,26 @@ void test_run_shardConsumer_catches_4xx_exception_and_increments_metric() { } } + @Test + void test_run_shardConsumer_calls_startUpdatingOwnershipForShard() throws Exception { + try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { + bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); + ShardConsumer shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig) + .shardIterator(null) + .shardAcknowledgementManager(shardAcknowledgementManager) + .streamPartition(streamPartition) + .tableInfo(tableInfo) + .startTime(null) + .waitForExport(false) + .build(); + + shardConsumer.run(); + + // Verify that startUpdatingOwnershipForShard is called + verify(shardAcknowledgementManager).startUpdatingOwnershipForShard(streamPartition); + } + } + private List buildRecords(int count) { List records = new ArrayList<>(); for (int i = 0; i < count; i++) { From 4a4e00741dae0746e344ce6f38bafedc6b72fa57 Mon Sep 17 00:00:00 2001 From: Jonah Calvo Date: Mon, 28 Jul 2025 10:51:30 -0500 Subject: [PATCH 10/14] Only start updating ownership if Ack manager exists Signed-off-by: Jonah Calvo --- .../plugins/source/dynamodb/stream/ShardConsumer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java index c33ffdc2d7..d609a82d67 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java @@ -246,9 +246,9 @@ public void run() { } return; } - - shardAcknowledgementManager.startUpdatingOwnershipForShard(streamPartition); - + if (shardAcknowledgementManager != null) { + shardAcknowledgementManager.startUpdatingOwnershipForShard(streamPartition); + } long lastCheckpointTime = System.currentTimeMillis(); String sequenceNumber = ""; int interval; From 7c858ac59bceee557866e47d18dec467fcee1aeb Mon Sep 17 00:00:00 2001 From: Jonah Calvo Date: Tue, 29 Jul 2025 15:58:52 -0500 Subject: [PATCH 11/14] Fix shutdown logic in ack manager Signed-off-by: Jonah Calvo --- .../dynamodb/stream/ShardAcknowledgementManager.java | 9 ++++++--- .../plugins/source/dynamodb/stream/ShardConsumer.java | 3 +-- .../plugins/source/dynamodb/stream/StreamScheduler.java | 5 +++++ 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java index 0c231332e0..f4afc5243e 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java @@ -92,9 +92,12 @@ void monitorAcknowledgments(final Consumer stopWorkerConsumer) boolean runMonitorAcknowledgmentLoop(final Consumer stopWorkerConsumer) { removePartitions(); - if (shutdownTriggered && checkpoints.isEmpty()) { - LOG.info("Shutdown was triggered and not waiting on any acknowledgments, exiting cleanly"); - return true; + if (shutdownTriggered) { + LOG.info("Shutdown was triggered giving up partitions and exiting cleanly"); + for (final StreamPartition streamPartition : checkpoints.keySet()) { + sourceCoordinator.giveUpPartition(streamPartition); + } + return true; } for (final StreamPartition streamPartition : checkpoints.keySet()) { diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java index d609a82d67..f80071c939 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java @@ -287,8 +287,7 @@ public void run() { AcknowledgementSet acknowledgementSet = null; if (shardAcknowledgementManager != null) { - final StreamProgressState lastProgressState = streamPartition.getProgressState().orElseThrow(); - acknowledgementSet = shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, lastProgressState.getSequenceNumber(), shardIterator == null); + acknowledgementSet = shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, sequenceNumber, shardIterator == null); } records = response.records().stream() diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java index 1297b487eb..976965e432 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java @@ -131,6 +131,11 @@ public void run() { // Should Stop LOG.warn("Stream Scheduler is interrupted, looks like shutdown has triggered"); + // Shutdown acknowledgment manager if it exists + if (shardAcknowledgementManager != null) { + shardAcknowledgementManager.shutdown(); + } + // Cannot call executor.shutdownNow() here // Otherwise the final checkpoint will fail due to SDK interruption. ShardConsumer.stopAll(); From 528dfae1bc4f4e0f575a03b1e418daaefe3d351e Mon Sep 17 00:00:00 2001 From: Jonah Calvo Date: Wed, 30 Jul 2025 10:05:30 -0500 Subject: [PATCH 12/14] Minor changes to unit tests Signed-off-by: Jonah Calvo --- data-prepper-plugins/dynamodb-source/build.gradle | 1 + .../plugins/source/dynamodb/stream/ShardConsumer.java | 1 - .../dynamodb/stream/ShardAcknowledgementManagerTest.java | 7 +++---- .../plugins/source/dynamodb/stream/ShardConsumerTest.java | 5 ++--- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/build.gradle b/data-prepper-plugins/dynamodb-source/build.gradle index 056691d7b2..09470908d7 100644 --- a/data-prepper-plugins/dynamodb-source/build.gradle +++ b/data-prepper-plugins/dynamodb-source/build.gradle @@ -27,4 +27,5 @@ dependencies { testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' + testImplementation project(':data-prepper-test-common') } \ No newline at end of file diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java index f80071c939..08f8576802 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java @@ -15,7 +15,6 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics; import org.slf4j.Logger; diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java index e182c44fcd..2f481e0750 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java @@ -21,8 +21,8 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; +import org.opensearch.dataprepper.test.helper.ReflectivelySetField; -import java.lang.reflect.Field; import java.time.Duration; import java.time.Instant; import java.util.Optional; @@ -105,9 +105,8 @@ void testUpdateOwnershipForAllShardPartitions() throws Exception { shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, "seq123", false); // Set lastCheckpointTime to past to trigger checkpoint interval - Field lastCheckpointTimeField = ShardAcknowledgementManager.class.getDeclaredField("lastCheckpointTime"); - lastCheckpointTimeField.setAccessible(true); - lastCheckpointTimeField.set(shardAcknowledgementManager, Instant.now().minus(Duration.ofMinutes(5))); + ReflectivelySetField.setField(ShardAcknowledgementManager.class, shardAcknowledgementManager, + "lastCheckpointTime", Instant.now().minus(Duration.ofMinutes(5))); // Call updateOwnershipForAllShardPartitions directly shardAcknowledgementManager.updateOwnershipForAllShardPartitions(); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java index 860b618f1d..034405898e 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java @@ -359,10 +359,9 @@ void test_run_shardConsumer_calls_startUpdatingOwnershipForShard() throws Except .build(); shardConsumer.run(); - - // Verify that startUpdatingOwnershipForShard is called - verify(shardAcknowledgementManager).startUpdatingOwnershipForShard(streamPartition); } + // Verify that startUpdatingOwnershipForShard is called + verify(shardAcknowledgementManager).startUpdatingOwnershipForShard(streamPartition); } private List buildRecords(int count) { From d7f17e47af4bb45aa9419d13c45dd01278283f4d Mon Sep 17 00:00:00 2001 From: Jonah Calvo Date: Wed, 30 Jul 2025 10:35:35 -0500 Subject: [PATCH 13/14] Minor change to fix build issue Signed-off-by: Jonah Calvo --- data-prepper-plugins/dynamodb-source/build.gradle | 2 +- .../dynamodb/stream/ShardAcknowledgementManagerTest.java | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/build.gradle b/data-prepper-plugins/dynamodb-source/build.gradle index 09470908d7..a79114d501 100644 --- a/data-prepper-plugins/dynamodb-source/build.gradle +++ b/data-prepper-plugins/dynamodb-source/build.gradle @@ -27,5 +27,5 @@ dependencies { testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' - testImplementation project(':data-prepper-test-common') + testImplementation project(path: ':data-prepper-test-common') } \ No newline at end of file diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java index 2f481e0750..9b13fe807a 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java @@ -21,13 +21,14 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; -import org.opensearch.dataprepper.test.helper.ReflectivelySetField; import java.time.Duration; import java.time.Instant; import java.util.Optional; import java.util.function.Consumer; +import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; + import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -105,7 +106,7 @@ void testUpdateOwnershipForAllShardPartitions() throws Exception { shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, "seq123", false); // Set lastCheckpointTime to past to trigger checkpoint interval - ReflectivelySetField.setField(ShardAcknowledgementManager.class, shardAcknowledgementManager, + setField(ShardAcknowledgementManager.class, shardAcknowledgementManager, "lastCheckpointTime", Instant.now().minus(Duration.ofMinutes(5))); // Call updateOwnershipForAllShardPartitions directly From 860e78b4083b6de26a5b0c9acc2449e8d84164e9 Mon Sep 17 00:00:00 2001 From: Jonah Calvo Date: Wed, 30 Jul 2025 11:33:30 -0500 Subject: [PATCH 14/14] revert change to build.gradle Signed-off-by: Jonah Calvo --- data-prepper-plugins/dynamodb-source/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-prepper-plugins/dynamodb-source/build.gradle b/data-prepper-plugins/dynamodb-source/build.gradle index a79114d501..b5bd6fd9cc 100644 --- a/data-prepper-plugins/dynamodb-source/build.gradle +++ b/data-prepper-plugins/dynamodb-source/build.gradle @@ -27,5 +27,5 @@ dependencies { testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' - testImplementation project(path: ':data-prepper-test-common') + testImplementation project(':data-prepper-test:test-common') } \ No newline at end of file