Skip to content

Commit 4130fbf

Browse files
author
Jonah Calvo
committed
Update tracking of partitions to give up
1 parent afa8d06 commit 4130fbf

2 files changed

Lines changed: 18 additions & 5 deletions

File tree

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public class ShardAcknowledgementManager {
4848

4949
private final ExecutorService executorService;
5050
private final List<StreamPartition> partitionsToRemove;
51+
private final List<StreamPartition> partitionsToGiveUp;
5152
private boolean shutdownTriggered;
5253

5354
private Instant lastCheckpointTime;
@@ -62,6 +63,7 @@ public ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgeme
6263
this.dynamoDBSourceConfig = dynamoDBSourceConfig;
6364
this.executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("dynamodb-shard-ack-monitor"));
6465
this.partitionsToRemove = Collections.synchronizedList(new ArrayList<>());
66+
this.partitionsToGiveUp = Collections.synchronizedList(new ArrayList<>());
6567
this.lastCheckpointTime = Instant.now();
6668

6769
// Start monitoring acknowledgments in the constructor
@@ -132,6 +134,11 @@ boolean runMonitorAcknowledgmentLoop(final Consumer<StreamPartition> stopWorkerC
132134
sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
133135
LOG.debug("Checkpointed shard {} with latest sequence number acknowledged {}", streamPartition.getShardId(), latestCheckpointForShard.getSequenceNumber());
134136
}
137+
if (partitionsToGiveUp.contains(streamPartition)) {
138+
partitionsToRemove.add(streamPartition);
139+
sourceCoordinator.giveUpPartition(streamPartition);
140+
}
141+
135142
} catch (final Exception e) {
136143
LOG.error("Received exception while monitoring acknowledgments for stream partition {}", streamPartition.getPartitionKey(), e);
137144
}
@@ -190,11 +197,13 @@ private void handleFailure(final StreamPartition streamPartition,
190197
}
191198
partitionsToRemove.add(streamPartition);
192199
sourceCoordinator.giveUpPartition(streamPartition);
200+
partitionsToGiveUp.remove(streamPartition);
193201
}
194202

195203
private void handleCompletedShard(final StreamPartition streamPartition) {
196204
sourceCoordinator.completePartition(streamPartition);
197205
partitionsToRemove.add(streamPartition);
206+
partitionsToGiveUp.remove(streamPartition);
198207
LOG.info("Received all acknowledgments for partition {}, marking partition as completed", streamPartition.getPartitionKey());
199208
}
200209

@@ -221,9 +230,10 @@ private void removePartitions() {
221230
}
222231

223232
public void giveUpPartition(final StreamPartition streamPartition) {
224-
sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
225-
checkpoints.remove(streamPartition);
226-
ackStatuses.remove(streamPartition);
233+
if (!partitionsToGiveUp.contains(streamPartition)) {
234+
LOG.debug("Adding partition {} to give up list", streamPartition.getPartitionKey());
235+
partitionsToGiveUp.add(streamPartition);
236+
}
227237
}
228238

229239
public boolean isExportDone(StreamPartition streamPartition) {

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,11 @@ private BiConsumer completeConsumer(StreamPartition streamPartition) {
151151
}
152152
} else {
153153
LOG.error("Received an exception while processing shard {}, giving up shard: {}", streamPartition.getShardId(), ex);
154-
shardAcknowledgementManager.giveUpPartition(streamPartition);
155-
coordinator.giveUpPartition(streamPartition);
154+
if (dynamoDBSourceConfig.isAcknowledgmentsEnabled()) {
155+
shardAcknowledgementManager.giveUpPartition(streamPartition);
156+
} else {
157+
coordinator.giveUpPartition(streamPartition);
158+
}
156159
}
157160
};
158161
}

0 commit comments

Comments
 (0)