Fix how failures are handled for shard partitions in DDB source#6184
Conversation
Signed-off-by: Taylor Gray <tylgry@amazon.com>
a10c0c7 to
a29f545
Compare
… condition Signed-off-by: Taylor Gray <tylgry@amazon.com>
89a4147 to
0094d95
Compare
| } | ||
|
|
||
| @VisibleForTesting | ||
| public ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager, |
There was a problem hiding this comment.
Please make this package protected.
Also, update the constructor above to use this.
public ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager,
final EnhancedSourceCoordinator sourceCoordinator,
final DynamoDBSourceConfig dynamoDBSourceConfig,
final Consumer<StreamPartition> stopWorkerConsumer) {
this(acknowledgementSetManager, sourceCoordinator, dynamoDBSourceConfig, createExecutorService(stopWorkConsumer));
}
private static ExecutorService createExecutorService(final Consumer<StreamPartition> stopWorkerConsumer) {
ExecutorService executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("dynamodb-shard-ack-monitor"));
executorService.submit(() -> monitorAcknowledgements(stopWorkConsumer));
return executorService.
}
There was a problem hiding this comment.
private static ExecutorService createExecutorService(final Consumer<StreamPartition> stopWorkerConsumer) {
ExecutorService executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("dynamodb-shard-ack-monitor"));
executorService.submit(() -> monitorAcknowledgements(stopWorkConsumer));
return executorService.
}
monitorAcknowledgments method can't be used in a static method
There was a problem hiding this comment.
That is just a Runnable. So you should be able to add a Runnable as a parameter to the static method.
Then:
this(acknowledgementSetManager, sourceCoordinator, dynamoDBSourceConfig, createExecutorService(stopWorkConsumer), () -> monitorAcknowledgements(stopWorkConsumer))
| callback1.accept(true); // Positive ack | ||
| callback2.accept(true); | ||
|
|
||
| // doThrow(PartitionUpdateException.class).when(sourceCoordinator).saveProgressStateForPartition(eq(streamPartition), |
There was a problem hiding this comment.
Please remove this commented out code.
|
|
||
| if (shardAcknowledgementManager != null && !shardAcknowledgementManager.isStillTrackingShard(streamPartition)) { | ||
| LOG.warn("Shard {} is no longer being tracked by the acknowledgment manager, exiting", streamPartition.getShardId()); | ||
| break; |
There was a problem hiding this comment.
Maybe for a future PR, but this and the exception seem to just result in ShardConsumer threads just going away without any real tracking of them.
There was a problem hiding this comment.
What would the ideal behavior be here? This does just have the ShardConsumer exit
There was a problem hiding this comment.
I think we may be ok here. I found this code that handles exceptions:
There was a problem hiding this comment.
Is giveUpPartition idempotent? If not, we might need a change here since we have already lost the partition in this particular case.
There was a problem hiding this comment.
Yes this is what happens when ShardConsumer throws an error.
And giveUpPartition would work if called twice as long as another worker hasn't already grabbed ownership of the shard. However, I tried to handle this to only be called once by putting this check in the giveUpPartition method of shardAcknowledgmentManager
public void giveUpPartition(final StreamPartition streamPartition) {
if (isStillTrackingShard(streamPartition)) {
LOG.debug("Adding partition {} to give up list", streamPartition.getPartitionKey());
partitionsToGiveUp.add(streamPartition);
partitionsToRemove.add(streamPartition);
}
}
So basically this call will only mark the partition to be given up by the acknowledgment manager and isn't actually calling give up partition. So it will only be called once.
Signed-off-by: Taylor Gray <tylgry@amazon.com>
9724c70 to
a188f93
Compare
…when checkpointing fails Signed-off-by: Taylor Gray <tylgry@amazon.com>
| checkpoints.remove(streamPartition); | ||
| ackStatuses.remove(streamPartition); | ||
| }); | ||
| lock.lock(); |
There was a problem hiding this comment.
it looks like all the collections the logic is dealing with - are already synchronized. Do we still need this lock?
There was a problem hiding this comment.
Any given collection is synchronized. But, this block of code is modifying multiple collections together.
Signed-off-by: Taylor Gray <tylgry@amazon.com>
| try { | ||
| acknowledgementSet = shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, sequenceNumber, shardIterator == null); | ||
| } catch (final ShardNotTrackedException e) { | ||
| LOG.warn(e.getMessage()); |
There was a problem hiding this comment.
Improve this log message:
LOG.warn("Not creating acknowledgment set since shard is not tracked: {}", e.getMessage());
There are two reasons for this:
- When somebody sees this warning it will be hard to know where is originated unless line logging is unabled.
- Unknown strings shouldn't be passed to a logger since it is treated as a format string.
There was a problem hiding this comment.
Makes sense will change this
Signed-off-by: Taylor Gray <tylgry@amazon.com>
Description
This change fixes how the DynamoDB source handles failures with acknowledgments, including ack timeouts, negative acknowledgments, and failure to update partition state.
When these failure cases happen, we will now signal to the shardConsumer to stop processing the shard.
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.