Skip to content
Merged
2 changes: 2 additions & 0 deletions data-prepper-plugins/dynamodb-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ dependencies {

implementation project(path: ':data-prepper-plugins:aws-plugin-api')
implementation project(path: ':data-prepper-plugins:buffer-common')
implementation project(path: ':data-prepper-plugins:common')


testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(':data-prepper-test:test-common')
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class DynamoDBSourceConfig {
private boolean acknowledgments = false;

@JsonProperty("shard_acknowledgment_timeout")
private Duration shardAcknowledgmentTimeout = Duration.ofMinutes(10);
private Duration shardAcknowledgmentTimeout = Duration.ofMinutes(30);

@JsonProperty("s3_data_file_acknowledgment_timeout")
private Duration dataFileAcknowledgmentTimeout = Duration.ofMinutes(15);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
Comment thread
JonahCalvo marked this conversation as resolved.
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.dynamodb.model;

import java.time.Duration;
import java.time.Instant;

public class ShardCheckpointStatus {
private final String sequenceNumber;

private final boolean isFinalAcknowledgmentForPartition;
private AcknowledgmentStatus acknowledgeStatus;
private final long createTimestamp;
private Long acknowledgedTimestamp;

public enum AcknowledgmentStatus {
POSITIVE_ACK,
NEGATIVE_ACK,
NO_ACK
}

public ShardCheckpointStatus(final String sequenceNumber, final long createTimestamp, final boolean isFinalAcknowledgmentForPartition) {
this.sequenceNumber = sequenceNumber;
this.acknowledgeStatus = AcknowledgmentStatus.NO_ACK;
this.createTimestamp = createTimestamp;
this.isFinalAcknowledgmentForPartition = isFinalAcknowledgmentForPartition;
}

public void setAcknowledgedTimestamp(final Long acknowledgedTimestamp) {
this.acknowledgedTimestamp = acknowledgedTimestamp;
}

public void setAcknowledged(final AcknowledgmentStatus acknowledgmentStatus) {
this.acknowledgeStatus = acknowledgmentStatus;
}

public String getSequenceNumber() {
return sequenceNumber;
}

public boolean isPositiveAcknowledgement() {
return this.acknowledgeStatus == AcknowledgmentStatus.POSITIVE_ACK;
}

public boolean isNegativeAcknowledgement() {
return this.acknowledgeStatus == AcknowledgmentStatus.NEGATIVE_ACK;
}

public boolean isFinalAcknowledgmentForPartition() {
return isFinalAcknowledgmentForPartition;
}

public boolean isExpired(final Duration expiredDuration) {
return Duration.between(Instant.ofEpochMilli(createTimestamp), Instant.now()).compareTo(expiredDuration) > 0;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.dynamodb.stream;
Comment thread
JonahCalvo marked this conversation as resolved.

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.opensearch.dataprepper.plugins.source.dynamodb.model.ShardCheckpointStatus;

public class ShardAcknowledgementManager {
private static final Logger LOG = LoggerFactory.getLogger(ShardAcknowledgementManager.class);

private static final String NULL_SEQUENCE_NUMBER = "null";

private static final long WAIT_FOR_ACKNOWLEDGMENTS_TIMEOUT = 10L;

static final Duration CHECKPOINT_INTERVAL = Duration.ofMinutes(2);

private final DynamoDBSourceConfig dynamoDBSourceConfig;
private final Map<StreamPartition, ConcurrentLinkedQueue<ShardCheckpointStatus>> checkpoints = new ConcurrentHashMap<>();
private final ConcurrentHashMap<StreamPartition, ConcurrentHashMap<String, ShardCheckpointStatus>> ackStatuses = new ConcurrentHashMap<>();

private final AcknowledgementSetManager acknowledgementSetManager;

private final EnhancedSourceCoordinator sourceCoordinator;

private final ExecutorService executorService;
private final List<StreamPartition> partitionsToRemove;
private final List<StreamPartition> partitionsToGiveUp;
private boolean shutdownTriggered;

private Instant lastCheckpointTime;

public ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager,
final EnhancedSourceCoordinator sourceCoordinator,
final DynamoDBSourceConfig dynamoDBSourceConfig,
final Consumer<StreamPartition> stopWorkerConsumer
) {
this.acknowledgementSetManager = acknowledgementSetManager;
this.sourceCoordinator = sourceCoordinator;
this.dynamoDBSourceConfig = dynamoDBSourceConfig;
this.executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("dynamodb-shard-ack-monitor"));
this.partitionsToRemove = Collections.synchronizedList(new ArrayList<>());
this.partitionsToGiveUp = Collections.synchronizedList(new ArrayList<>());
this.lastCheckpointTime = Instant.now();

executorService.submit(() -> monitorAcknowledgments(stopWorkerConsumer));
}

void monitorAcknowledgments(final Consumer<StreamPartition> stopWorkerConsumer) {
while (!Thread.currentThread().isInterrupted()) {
boolean exit = runMonitorAcknowledgmentLoop(stopWorkerConsumer);
if (exit) {
break;
}
}

LOG.info("Exiting acknowledgment manager");
}

boolean runMonitorAcknowledgmentLoop(final Consumer<StreamPartition> stopWorkerConsumer) {
removePartitions();
Comment thread
JonahCalvo marked this conversation as resolved.
if (shutdownTriggered) {
LOG.info("Shutdown was triggered giving up partitions and exiting cleanly");
for (final StreamPartition streamPartition : checkpoints.keySet()) {
sourceCoordinator.giveUpPartition(streamPartition);
}
return true;
}

for (final StreamPartition streamPartition : checkpoints.keySet()) {
try {
final StreamProgressState streamProgressState = streamPartition.getProgressState().orElseThrow();
final ConcurrentLinkedQueue<ShardCheckpointStatus> checkpointStatuses = checkpoints.get(streamPartition);
ShardCheckpointStatus latestCheckpointForShard = null;
boolean gaveUpPartition = false;
while (!checkpointStatuses.isEmpty()) {
updateOwnershipForAllShardPartitions();

if (checkpointStatuses.peek().isPositiveAcknowledgement()) {
latestCheckpointForShard = checkpointStatuses.poll();
} else if (checkpointStatuses.peek().isNegativeAcknowledgement()
|| checkpointStatuses.peek().isExpired(dynamoDBSourceConfig.getShardAcknowledgmentTimeout())) {
handleFailure(streamPartition, streamProgressState, latestCheckpointForShard);
gaveUpPartition = true;

if (checkpointStatuses.peek().isNegativeAcknowledgement()) {
LOG.warn("Received negative acknowledgment for partition {} with sequence number {}, giving up partition",
streamPartition.getPartitionKey(), checkpointStatuses.peek().getSequenceNumber());
} else {
LOG.warn("Acknowledgment timed out for partition {} with sequence number {}, giving up partition",
streamPartition.getPartitionKey(), checkpointStatuses.peek().getSequenceNumber());
}

stopWorkerConsumer.accept(streamPartition);
break;
} else {
break;
}
}

if (!gaveUpPartition) {
updateOwnershipForAllShardPartitions();
}

if (gaveUpPartition || latestCheckpointForShard == null) {
continue;
}

if (latestCheckpointForShard.isFinalAcknowledgmentForPartition()) {
handleCompletedShard(streamPartition);
} else {
streamProgressState.setSequenceNumber(Objects.equals(latestCheckpointForShard.getSequenceNumber(), NULL_SEQUENCE_NUMBER) ? null : latestCheckpointForShard.getSequenceNumber());
sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
LOG.debug("Checkpointed shard {} with latest sequence number acknowledged {}", streamPartition.getShardId(), latestCheckpointForShard.getSequenceNumber());
}
if (partitionsToGiveUp.contains(streamPartition)) {
partitionsToRemove.add(streamPartition);
sourceCoordinator.giveUpPartition(streamPartition);
}

} catch (final Exception e) {
LOG.error("Received exception while monitoring acknowledgments for stream partition {}", streamPartition.getPartitionKey(), e);
}
}

return false;
}

public AcknowledgementSet createAcknowledgmentSet(
final StreamPartition streamPartition,
final String sequenceNumber,
final boolean isFinalSetForPartition) {
final String sequenceNumberNoNull = sequenceNumber == null ? NULL_SEQUENCE_NUMBER : sequenceNumber;
final ShardCheckpointStatus shardCheckpointStatus = new ShardCheckpointStatus(sequenceNumber, Instant.now().toEpochMilli(), isFinalSetForPartition);
checkpoints.computeIfAbsent(streamPartition, segment -> new ConcurrentLinkedQueue<>()).add(shardCheckpointStatus);
ackStatuses.computeIfAbsent(streamPartition, segment -> new ConcurrentHashMap<>());
ackStatuses.get(streamPartition).put(sequenceNumberNoNull, shardCheckpointStatus);

return acknowledgementSetManager.create((result) -> {
if (ackStatuses.containsKey(streamPartition) && ackStatuses.get(streamPartition).containsKey(sequenceNumberNoNull)) {
final ShardCheckpointStatus ackCheckpointStatus = ackStatuses.get(streamPartition).get(sequenceNumberNoNull);

ackCheckpointStatus.setAcknowledgedTimestamp(Instant.now().toEpochMilli());

if (result) {
LOG.debug("Received acknowledgment of completion from sink for partition {} with sequence number {}",
streamPartition.getPartitionKey(), sequenceNumberNoNull);
ackCheckpointStatus.setAcknowledged(ShardCheckpointStatus.AcknowledgmentStatus.POSITIVE_ACK);
} else {
LOG.warn("Negative acknowledgment received for partition {} with sequence number {}",
streamPartition.getPartitionKey(), sequenceNumberNoNull);
ackCheckpointStatus.setAcknowledged(ShardCheckpointStatus.AcknowledgmentStatus.NEGATIVE_ACK);
}
}
}, dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
Comment thread
JonahCalvo marked this conversation as resolved.
}

void updateOwnershipForAllShardPartitions() {
if (Duration.between(lastCheckpointTime, Instant.now()).compareTo(CHECKPOINT_INTERVAL) > 0) {
for (final StreamPartition streamPartition : checkpoints.keySet()) {
if (!partitionsToRemove.contains(streamPartition)) {
sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
}
}

lastCheckpointTime = Instant.now();
}
}

private void handleFailure(final StreamPartition streamPartition,
final StreamProgressState streamProgressState,
final ShardCheckpointStatus latestCheckpointForShard) {
if (latestCheckpointForShard != null) {
streamProgressState.setSequenceNumber(latestCheckpointForShard.getSequenceNumber());
sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
}
partitionsToRemove.add(streamPartition);
sourceCoordinator.giveUpPartition(streamPartition);
partitionsToGiveUp.remove(streamPartition);
Comment thread
JonahCalvo marked this conversation as resolved.
}

private void handleCompletedShard(final StreamPartition streamPartition) {
sourceCoordinator.completePartition(streamPartition);
partitionsToRemove.add(streamPartition);
partitionsToGiveUp.remove(streamPartition);
LOG.info("Received all acknowledgments for partition {}, marking partition as completed", streamPartition.getPartitionKey());
}

public void shutdown() {
shutdownTriggered = true;
executorService.shutdown();
try {
if (!executorService.awaitTermination(WAIT_FOR_ACKNOWLEDGMENTS_TIMEOUT, TimeUnit.MINUTES)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}

private void removePartitions() {
partitionsToRemove.forEach(streamPartition -> {
checkpoints.remove(streamPartition);
ackStatuses.remove(streamPartition);
});

partitionsToRemove.clear();
}

public void giveUpPartition(final StreamPartition streamPartition) {
if (!partitionsToGiveUp.contains(streamPartition)) {
LOG.debug("Adding partition {} to give up list", streamPartition.getPartitionKey());
partitionsToGiveUp.add(streamPartition);
}
}

public boolean isExportDone(StreamPartition streamPartition) {
Optional<EnhancedSourcePartition> globalPartition = sourceCoordinator.getPartition(streamPartition.getStreamArn());
return globalPartition.isPresent();
}

public void startUpdatingOwnershipForShard(final StreamPartition streamPartition) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a unit test where we make this call to ShardAcknowledgmentManager, and then run the loop and verify ownership update is called for that partition

checkpoints.computeIfAbsent(streamPartition, segment -> new ConcurrentLinkedQueue<>());
}
}
Loading
Loading