1010
1111package org .opensearch .dataprepper .plugins .source .dynamodb .stream ;
1212
13+ import com .google .common .annotations .VisibleForTesting ;
1314import org .opensearch .dataprepper .model .acknowledgements .AcknowledgementSet ;
1415import org .opensearch .dataprepper .model .acknowledgements .AcknowledgementSetManager ;
1516import org .opensearch .dataprepper .model .source .coordinator .enhanced .EnhancedSourceCoordinator ;
1617import org .opensearch .dataprepper .common .concurrent .BackgroundThreadFactory ;
1718import org .opensearch .dataprepper .model .source .coordinator .enhanced .EnhancedSourcePartition ;
19+ import org .opensearch .dataprepper .model .source .coordinator .exceptions .PartitionUpdateException ;
1820import org .opensearch .dataprepper .plugins .source .dynamodb .DynamoDBSourceConfig ;
1921import org .opensearch .dataprepper .plugins .source .dynamodb .coordination .partition .StreamPartition ;
2022import org .opensearch .dataprepper .plugins .source .dynamodb .coordination .state .StreamProgressState ;
3941
4042import org .opensearch .dataprepper .plugins .source .dynamodb .model .ShardCheckpointStatus ;
4143
44+ import static org .opensearch .dataprepper .logging .DataPrepperMarkers .NOISY ;
45+
4246public class ShardAcknowledgementManager {
4347 private static final Logger LOG = LoggerFactory .getLogger (ShardAcknowledgementManager .class );
4448
4549 private static final String NULL_SEQUENCE_NUMBER = "null" ;
4650
4751 private static final long WAIT_FOR_ACKNOWLEDGMENTS_TIMEOUT = 10L ;
4852
49- static final Duration CHECKPOINT_INTERVAL = Duration .ofMinutes (2 );
53+ static final Duration CHECKPOINT_INTERVAL = Duration .ofMinutes (3 );
5054
5155 private final DynamoDBSourceConfig dynamoDBSourceConfig ;
5256 private final Map <StreamPartition , ConcurrentLinkedQueue <ShardCheckpointStatus >> checkpoints = new ConcurrentHashMap <>();
@@ -79,6 +83,20 @@ public ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgeme
7983 executorService .submit (() -> monitorAcknowledgments (stopWorkerConsumer ));
8084 }
8185
86+ @ VisibleForTesting
87+ public ShardAcknowledgementManager (final AcknowledgementSetManager acknowledgementSetManager ,
88+ final EnhancedSourceCoordinator sourceCoordinator ,
89+ final DynamoDBSourceConfig dynamoDBSourceConfig ,
90+ final ExecutorService executorService ) {
91+ this .executorService = executorService ;
92+ this .acknowledgementSetManager = acknowledgementSetManager ;
93+ this .sourceCoordinator = sourceCoordinator ;
94+ this .dynamoDBSourceConfig = dynamoDBSourceConfig ;
95+ this .partitionsToRemove = Collections .synchronizedList (new ArrayList <>());
96+ this .partitionsToGiveUp = Collections .synchronizedList (new ArrayList <>());
97+ this .lastCheckpointTime = Instant .now ();
98+ }
99+
82100 void monitorAcknowledgments (final Consumer <StreamPartition > stopWorkerConsumer ) {
83101 while (!Thread .currentThread ().isInterrupted ()) {
84102 boolean exit = runMonitorAcknowledgmentLoop (stopWorkerConsumer );
@@ -153,13 +171,9 @@ boolean runMonitorAcknowledgmentLoop(final Consumer<StreamPartition> stopWorkerC
153171 sourceCoordinator .saveProgressStateForPartition (streamPartition , dynamoDBSourceConfig .getShardAcknowledgmentTimeout ());
154172 LOG .debug ("Checkpointed shard {} with latest sequence number acknowledged {}" , streamPartition .getShardId (), latestCheckpointForShard .getSequenceNumber ());
155173 }
156- if (partitionsToGiveUp .contains (streamPartition )) {
157- partitionsToRemove .add (streamPartition );
158- sourceCoordinator .giveUpPartition (streamPartition );
159- }
160-
161174 } catch (final Exception e ) {
162- LOG .error ("Received exception while monitoring acknowledgments for stream partition {}" , streamPartition .getPartitionKey (), e );
175+ LOG .error (NOISY , "Received exception while monitoring acknowledgments for stream partition {}, stop processing shard" , streamPartition .getPartitionKey (), e );
176+ markPartitionForRemoval (streamPartition );
163177 }
164178 }
165179
@@ -172,6 +186,13 @@ public AcknowledgementSet createAcknowledgmentSet(
172186 final boolean isFinalSetForPartition ) {
173187 final String sequenceNumberNoNull = sequenceNumber == null ? NULL_SEQUENCE_NUMBER : sequenceNumber ;
174188 final ShardCheckpointStatus shardCheckpointStatus = new ShardCheckpointStatus (sequenceNumber , Instant .now ().toEpochMilli (), isFinalSetForPartition );
189+
190+ // Shard should already be in checkpoints map from call to startUpdatingOwnershipForShard, if it is not in the map
191+ // that means that ShardAcknowledgmentManager stopped tracking it due to some error, and another worker will pick it up
192+ // We throw an error in this case to have the ShardConsumer exit and stop reading data from the shard
193+ if (!isStillTrackingShard (streamPartition )) {
194+ throw new ShardNotTrackedException ("The shard {} is not being tracked anymore, stop reading from shard" );
195+ }
175196 checkpoints .computeIfAbsent (streamPartition , segment -> new ConcurrentLinkedQueue <>()).add (shardCheckpointStatus );
176197 ackStatuses .computeIfAbsent (streamPartition , segment -> new ConcurrentHashMap <>());
177198 ackStatuses .get (streamPartition ).put (sequenceNumberNoNull , shardCheckpointStatus );
@@ -199,7 +220,12 @@ void updateOwnershipForAllShardPartitions() {
199220 if (Duration .between (lastCheckpointTime , Instant .now ()).compareTo (CHECKPOINT_INTERVAL ) > 0 ) {
200221 for (final StreamPartition streamPartition : checkpoints .keySet ()) {
201222 if (!partitionsToRemove .contains (streamPartition )) {
202- sourceCoordinator .saveProgressStateForPartition (streamPartition , dynamoDBSourceConfig .getShardAcknowledgmentTimeout ());
223+ try {
224+ sourceCoordinator .saveProgressStateForPartition (streamPartition , dynamoDBSourceConfig .getShardAcknowledgmentTimeout ());
225+ } catch (final PartitionUpdateException e ) {
226+ LOG .warn (NOISY , "Failed to update progress state for shard {}, will stop tracking this shard as someone else owns it" , streamPartition .getShardId ());
227+ markPartitionForRemoval (streamPartition );
228+ }
203229 }
204230 }
205231
@@ -214,9 +240,8 @@ private void handleFailure(final StreamPartition streamPartition,
214240 streamProgressState .setSequenceNumber (latestCheckpointForShard .getSequenceNumber ());
215241 sourceCoordinator .saveProgressStateForPartition (streamPartition , dynamoDBSourceConfig .getShardAcknowledgmentTimeout ());
216242 }
217- partitionsToRemove .add (streamPartition );
218- sourceCoordinator .giveUpPartition (streamPartition );
219- partitionsToGiveUp .remove (streamPartition );
243+
244+ markPartitionForRemoval (streamPartition );
220245 }
221246
222247 private void handleCompletedShard (final StreamPartition streamPartition ) {
@@ -245,13 +270,17 @@ private void removePartitions() {
245270 ackStatuses .remove (streamPartition );
246271 });
247272
273+ partitionsToGiveUp .forEach (sourceCoordinator ::giveUpPartition );
274+
248275 partitionsToRemove .clear ();
276+ partitionsToGiveUp .clear ();
249277 }
250278
251279 public void giveUpPartition (final StreamPartition streamPartition ) {
252- if (! partitionsToGiveUp . contains (streamPartition )) {
280+ if (isStillTrackingShard (streamPartition )) {
253281 LOG .debug ("Adding partition {} to give up list" , streamPartition .getPartitionKey ());
254282 partitionsToGiveUp .add (streamPartition );
283+ partitionsToRemove .add (streamPartition );
255284 }
256285 }
257286
@@ -260,7 +289,24 @@ public boolean isExportDone(StreamPartition streamPartition) {
260289 return globalPartition .isPresent ();
261290 }
262291
263- public void startUpdatingOwnershipForShard (final StreamPartition streamPartition ) {
292+ void startUpdatingOwnershipForShard (final StreamPartition streamPartition ) {
264293 checkpoints .computeIfAbsent (streamPartition , segment -> new ConcurrentLinkedQueue <>());
265294 }
295+
296+ boolean isStillTrackingShard (final StreamPartition streamPartition ) {
297+ return isStillTrackingShardInternal (streamPartition );
298+ }
299+
300+ private boolean isStillTrackingShardInternal (final StreamPartition streamPartition ) {
301+ LOG .info ("partitions to remove: {}" , partitionsToRemove );
302+ LOG .info ("Checkpoints: {}" , checkpoints );
303+ return !partitionsToRemove .contains (streamPartition ) && checkpoints .containsKey (streamPartition );
304+ }
305+
306+ private void markPartitionForRemoval (final StreamPartition streamPartition ) {
307+ if (!partitionsToRemove .contains (streamPartition )) {
308+ partitionsToRemove .add (streamPartition );
309+ partitionsToGiveUp .add (streamPartition );
310+ }
311+ }
266312}
0 commit comments