Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@

package org.opensearch.dataprepper.plugins.source.dynamodb.stream;

import com.google.common.annotations.VisibleForTesting;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException;
import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState;
Expand All @@ -35,18 +37,22 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

import org.opensearch.dataprepper.plugins.source.dynamodb.model.ShardCheckpointStatus;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;

public class ShardAcknowledgementManager {
private static final Logger LOG = LoggerFactory.getLogger(ShardAcknowledgementManager.class);

private static final String NULL_SEQUENCE_NUMBER = "null";

private static final long WAIT_FOR_ACKNOWLEDGMENTS_TIMEOUT = 10L;

static final Duration CHECKPOINT_INTERVAL = Duration.ofMinutes(2);
static final Duration CHECKPOINT_INTERVAL = Duration.ofMinutes(3);

private final DynamoDBSourceConfig dynamoDBSourceConfig;
private final Map<StreamPartition, ConcurrentLinkedQueue<ShardCheckpointStatus>> checkpoints = new ConcurrentHashMap<>();
Expand All @@ -62,20 +68,35 @@ public class ShardAcknowledgementManager {
private boolean shutdownTriggered;

private Instant lastCheckpointTime;
private Lock lock;

public ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager,
ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager,
final EnhancedSourceCoordinator sourceCoordinator,
final DynamoDBSourceConfig dynamoDBSourceConfig,
final Consumer<StreamPartition> stopWorkerConsumer
) {
this(acknowledgementSetManager, sourceCoordinator, dynamoDBSourceConfig,
createExecutorService(), stopWorkerConsumer);
}

private static ExecutorService createExecutorService() {
return Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("dynamodb-shard-ack-monitor"));
}

@VisibleForTesting
ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager,
final EnhancedSourceCoordinator sourceCoordinator,
final DynamoDBSourceConfig dynamoDBSourceConfig,
final ExecutorService executorService,
final Consumer<StreamPartition> stopWorkerConsumer) {
this.executorService = executorService;
this.acknowledgementSetManager = acknowledgementSetManager;
this.sourceCoordinator = sourceCoordinator;
this.dynamoDBSourceConfig = dynamoDBSourceConfig;
this.executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("dynamodb-shard-ack-monitor"));
this.partitionsToRemove = Collections.synchronizedList(new ArrayList<>());
this.partitionsToGiveUp = Collections.synchronizedList(new ArrayList<>());
this.lastCheckpointTime = Instant.now();

this.lock = new ReentrantLock();
executorService.submit(() -> monitorAcknowledgments(stopWorkerConsumer));
}

Expand Down Expand Up @@ -148,18 +169,21 @@ boolean runMonitorAcknowledgmentLoop(final Consumer<StreamPartition> stopWorkerC

if (latestCheckpointForShard.isFinalAcknowledgmentForPartition()) {
handleCompletedShard(streamPartition);
} else {
} else if (!partitionsToRemove.contains(streamPartition)) {
streamProgressState.setSequenceNumber(Objects.equals(latestCheckpointForShard.getSequenceNumber(), NULL_SEQUENCE_NUMBER) ? null : latestCheckpointForShard.getSequenceNumber());
sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
try {
sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
} catch (final PartitionUpdateException e) {
LOG.warn("Failed to checkpoint shard {}, stop processing shard. This shard will be processed by another worker.", streamPartition.getPartitionKey());
partitionsToRemove.add(streamPartition);
}
LOG.debug("Checkpointed shard {} with latest sequence number acknowledged {}", streamPartition.getShardId(), latestCheckpointForShard.getSequenceNumber());
}
if (partitionsToGiveUp.contains(streamPartition)) {
} catch (final Exception e) {
LOG.error(NOISY, "Received exception while monitoring acknowledgments for stream partition {}, stop processing shard", streamPartition.getPartitionKey(), e);
if (!partitionsToRemove.contains(streamPartition)) {
partitionsToRemove.add(streamPartition);
sourceCoordinator.giveUpPartition(streamPartition);
}

} catch (final Exception e) {
LOG.error("Received exception while monitoring acknowledgments for stream partition {}", streamPartition.getPartitionKey(), e);
}
}

Expand All @@ -172,7 +196,21 @@ public AcknowledgementSet createAcknowledgmentSet(
final boolean isFinalSetForPartition) {
final String sequenceNumberNoNull = sequenceNumber == null ? NULL_SEQUENCE_NUMBER : sequenceNumber;
final ShardCheckpointStatus shardCheckpointStatus = new ShardCheckpointStatus(sequenceNumber, Instant.now().toEpochMilli(), isFinalSetForPartition);
checkpoints.computeIfAbsent(streamPartition, segment -> new ConcurrentLinkedQueue<>()).add(shardCheckpointStatus);

// Shard should already be in checkpoints map from call to startUpdatingOwnershipForShard, if it is not in the map
// that means that ShardAcknowledgmentManager stopped tracking it due to some error, and another worker will pick it up
// We throw an error in this case to have the ShardConsumer exit and stop reading data from the shard
lock.lock();
try {
ConcurrentLinkedQueue<ShardCheckpointStatus> queue = checkpoints.get(streamPartition);
if (queue == null) {
throw new ShardNotTrackedException("The shard {} is not being tracked anymore, stop reading from shard");
}
queue.add(shardCheckpointStatus);
} finally {
lock.unlock();
}

ackStatuses.computeIfAbsent(streamPartition, segment -> new ConcurrentHashMap<>());
ackStatuses.get(streamPartition).put(sequenceNumberNoNull, shardCheckpointStatus);

Expand All @@ -187,7 +225,7 @@ public AcknowledgementSet createAcknowledgmentSet(
streamPartition.getPartitionKey(), sequenceNumberNoNull);
ackCheckpointStatus.setAcknowledged(ShardCheckpointStatus.AcknowledgmentStatus.POSITIVE_ACK);
} else {
LOG.warn("Negative acknowledgment received for partition {} with sequence number {}",
LOG.debug(NOISY, "Negative acknowledgment received for partition {} with sequence number {}",
streamPartition.getPartitionKey(), sequenceNumberNoNull);
ackCheckpointStatus.setAcknowledged(ShardCheckpointStatus.AcknowledgmentStatus.NEGATIVE_ACK);
}
Expand All @@ -196,10 +234,16 @@ public AcknowledgementSet createAcknowledgmentSet(
}

void updateOwnershipForAllShardPartitions() {

if (Duration.between(lastCheckpointTime, Instant.now()).compareTo(CHECKPOINT_INTERVAL) > 0) {
for (final StreamPartition streamPartition : checkpoints.keySet()) {
if (!partitionsToRemove.contains(streamPartition)) {
sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
try {
sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
} catch (final PartitionUpdateException e) {
LOG.warn(NOISY, "Failed to update progress state for shard {}, will stop tracking this shard as someone else owns it", streamPartition.getShardId());
partitionsToRemove.add(streamPartition);
}
}
}

Expand All @@ -214,9 +258,8 @@ private void handleFailure(final StreamPartition streamPartition,
streamProgressState.setSequenceNumber(latestCheckpointForShard.getSequenceNumber());
sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
}
partitionsToRemove.add(streamPartition);
sourceCoordinator.giveUpPartition(streamPartition);
partitionsToGiveUp.remove(streamPartition);

markPartitionForRemoval(streamPartition);
}

private void handleCompletedShard(final StreamPartition streamPartition) {
Expand All @@ -240,18 +283,34 @@ public void shutdown() {
}

private void removePartitions() {
partitionsToRemove.forEach(streamPartition -> {
checkpoints.remove(streamPartition);
ackStatuses.remove(streamPartition);
});
lock.lock();

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like all the collections the logic is dealing with - are already synchronized. Do we still need this lock?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any given collection is synchronized. But, this block of code is modifying multiple collections together.

try {
partitionsToRemove.forEach(streamPartition -> {
checkpoints.remove(streamPartition);
ackStatuses.remove(streamPartition);
});

partitionsToGiveUp.forEach(partition -> {
try {
sourceCoordinator.giveUpPartition(partition);
LOG.info("Gave up partition for shard {}", partition.getShardId());
} catch (final PartitionUpdateException e) {
LOG.warn("Received exception giving up shard {}, this shard will be reprocessed once the ownership timeout expires.", partition.getShardId());
}
});

partitionsToRemove.clear();
partitionsToRemove.clear();
partitionsToGiveUp.clear();
} finally {
lock.unlock();
}
}

public void giveUpPartition(final StreamPartition streamPartition) {
if (!partitionsToGiveUp.contains(streamPartition)) {
if (isStillTrackingShard(streamPartition)) {
LOG.debug("Adding partition {} to give up list", streamPartition.getPartitionKey());
partitionsToGiveUp.add(streamPartition);
partitionsToRemove.add(streamPartition);
}
}

Expand All @@ -260,7 +319,22 @@ public boolean isExportDone(StreamPartition streamPartition) {
return globalPartition.isPresent();
}

public void startUpdatingOwnershipForShard(final StreamPartition streamPartition) {
void startUpdatingOwnershipForShard(final StreamPartition streamPartition) {
checkpoints.computeIfAbsent(streamPartition, segment -> new ConcurrentLinkedQueue<>());
}

boolean isStillTrackingShard(final StreamPartition streamPartition) {
return isStillTrackingShardInternal(streamPartition);
}

private boolean isStillTrackingShardInternal(final StreamPartition streamPartition) {
return !partitionsToRemove.contains(streamPartition) && checkpoints.containsKey(streamPartition);
}

private void markPartitionForRemoval(final StreamPartition streamPartition) {
if (!partitionsToRemove.contains(streamPartition)) {
partitionsToRemove.add(streamPartition);
partitionsToGiveUp.add(streamPartition);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ public void run() {
lastCheckpointTime = System.currentTimeMillis();
}

if (shardAcknowledgementManager != null && !shardAcknowledgementManager.isStillTrackingShard(streamPartition)) {
LOG.warn("Shard {} is no longer being tracked by the acknowledgment manager, exiting", streamPartition.getShardId());
break;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe for a future PR, but this and the exception seem to just result in ShardConsumer threads just going away without any real tracking of them.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would the ideal behavior be here? This does just have the ShardConsumer exit

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may be ok here. I found this code that handles exceptions:

} else {
LOG.error("Received an exception while processing shard {}, giving up shard: {}", streamPartition.getShardId(), ex);
if (dynamoDBSourceConfig.isAcknowledgmentsEnabled()) {
shardAcknowledgementManager.giveUpPartition(streamPartition);
} else {
coordinator.giveUpPartition(streamPartition);
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is giveUpPartition idempotent? If not, we might need a change here since we have already lost the partition in this particular case.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is what happens when ShardConsumer throws an error.

And giveUpPartition would work if called twice as long as another worker hasn't already grabbed ownership of the shard. However, I tried to handle this to only be called once by putting this check in the giveUpPartition method of shardAcknowledgmentManager

public void giveUpPartition(final StreamPartition streamPartition) {
        if (isStillTrackingShard(streamPartition)) {
            LOG.debug("Adding partition {} to give up list", streamPartition.getPartitionKey());
            partitionsToGiveUp.add(streamPartition);
            partitionsToRemove.add(streamPartition);
        }
    }

So basically this call will only mark the partition to be given up by the acknowledgment manager and isn't actually calling give up partition. So it will only be called once.

}

GetRecordsResponse response = callGetRecords(shardIterator);
shardIterator = response.nextShardIterator();
if (!response.records().isEmpty()) {
Expand All @@ -290,7 +295,13 @@ public void run() {

AcknowledgementSet acknowledgementSet = null;
if (shardAcknowledgementManager != null) {
acknowledgementSet = shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, sequenceNumber, shardIterator == null);
try {
acknowledgementSet = shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, sequenceNumber, shardIterator == null);
} catch (final ShardNotTrackedException e) {
LOG.warn("Not creating acknowledgment set since shard is not tracked: {}", e.getMessage());
break;
}

if (shardIterator == null) {
createdFinalAcknowledgmentSetForShard = true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.dynamodb.stream;

public class ShardNotTrackedException extends RuntimeException {
public ShardNotTrackedException(String message) {
super(message);
}
}
Loading
Loading