-
Notifications
You must be signed in to change notification settings - Fork 332
Fix how failures are handled for shard partitions in DDB source #6184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a29f545
0094d95
a188f93
3daef6e
d27ee50
d0170f8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -272,6 +272,11 @@ public void run() { | |||||||||||||||||
| lastCheckpointTime = System.currentTimeMillis(); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| if (shardAcknowledgementManager != null && !shardAcknowledgementManager.isStillTrackingShard(streamPartition)) { | ||||||||||||||||||
| LOG.warn("Shard {} is no longer being tracked by the acknowledgment manager, exiting", streamPartition.getShardId()); | ||||||||||||||||||
| break; | ||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe for a future PR, but this and the exception seem to just result in
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would the ideal behavior be here? This does just have the ShardConsumer exit
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we may be ok here. I found this code that handles exceptions: Lines 157 to 164 in 2c66d59
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes this is what happens when ShardConsumer throws an error. And So basically this call will only mark the partition to be given up by the acknowledgment manager and isn't actually calling give up partition. So it will only be called once. |
||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| GetRecordsResponse response = callGetRecords(shardIterator); | ||||||||||||||||||
| shardIterator = response.nextShardIterator(); | ||||||||||||||||||
| if (!response.records().isEmpty()) { | ||||||||||||||||||
|
|
@@ -290,7 +295,13 @@ public void run() { | |||||||||||||||||
|
|
||||||||||||||||||
| AcknowledgementSet acknowledgementSet = null; | ||||||||||||||||||
| if (shardAcknowledgementManager != null) { | ||||||||||||||||||
| acknowledgementSet = shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, sequenceNumber, shardIterator == null); | ||||||||||||||||||
| try { | ||||||||||||||||||
| acknowledgementSet = shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, sequenceNumber, shardIterator == null); | ||||||||||||||||||
| } catch (final ShardNotTrackedException e) { | ||||||||||||||||||
| LOG.warn("Not creating acknowledgment set since shard is not tracked: {}", e.getMessage()); | ||||||||||||||||||
| break; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| if (shardIterator == null) { | ||||||||||||||||||
| createdFinalAcknowledgmentSetForShard = true; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.plugins.source.dynamodb.stream; | ||
|
|
||
| public class ShardNotTrackedException extends RuntimeException { | ||
| public ShardNotTrackedException(String message) { | ||
| super(message); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like all the collections the logic is dealing with - are already synchronized. Do we still need this lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any given collection is synchronized. But, this block of code is modifying multiple collections together.