Skip to content

Commit dfc3c70

Browse files
author
Jonah Calvo
authored
Introduce checkpointing per shard to DDB source, via ShardAcknowledgementManager class (#5818)
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
1 parent aff23af commit dfc3c70

12 files changed

Lines changed: 745 additions & 320 deletions

File tree

data-prepper-plugins/dynamodb-source/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ dependencies {
2323

2424
implementation project(path: ':data-prepper-plugins:aws-plugin-api')
2525
implementation project(path: ':data-prepper-plugins:buffer-common')
26+
implementation project(path: ':data-prepper-plugins:common')
2627

2728

2829
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
30+
testImplementation project(':data-prepper-test:test-common')
2931
}

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class DynamoDBSourceConfig {
3737
private boolean acknowledgments = false;
3838

3939
@JsonProperty("shard_acknowledgment_timeout")
40-
private Duration shardAcknowledgmentTimeout = Duration.ofMinutes(10);
40+
private Duration shardAcknowledgmentTimeout = Duration.ofMinutes(30);
4141

4242
@JsonProperty("s3_data_file_acknowledgment_timeout")
4343
private Duration dataFileAcknowledgmentTimeout = Duration.ofMinutes(15);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.dynamodb.model;
12+
13+
import java.time.Duration;
14+
import java.time.Instant;
15+
16+
public class ShardCheckpointStatus {
17+
private final String sequenceNumber;
18+
19+
private final boolean isFinalAcknowledgmentForPartition;
20+
private AcknowledgmentStatus acknowledgeStatus;
21+
private final long createTimestamp;
22+
private Long acknowledgedTimestamp;
23+
24+
public enum AcknowledgmentStatus {
25+
POSITIVE_ACK,
26+
NEGATIVE_ACK,
27+
NO_ACK
28+
}
29+
30+
public ShardCheckpointStatus(final String sequenceNumber, final long createTimestamp, final boolean isFinalAcknowledgmentForPartition) {
31+
this.sequenceNumber = sequenceNumber;
32+
this.acknowledgeStatus = AcknowledgmentStatus.NO_ACK;
33+
this.createTimestamp = createTimestamp;
34+
this.isFinalAcknowledgmentForPartition = isFinalAcknowledgmentForPartition;
35+
}
36+
37+
public void setAcknowledgedTimestamp(final Long acknowledgedTimestamp) {
38+
this.acknowledgedTimestamp = acknowledgedTimestamp;
39+
}
40+
41+
public void setAcknowledged(final AcknowledgmentStatus acknowledgmentStatus) {
42+
this.acknowledgeStatus = acknowledgmentStatus;
43+
}
44+
45+
public String getSequenceNumber() {
46+
return sequenceNumber;
47+
}
48+
49+
public boolean isPositiveAcknowledgement() {
50+
return this.acknowledgeStatus == AcknowledgmentStatus.POSITIVE_ACK;
51+
}
52+
53+
public boolean isNegativeAcknowledgement() {
54+
return this.acknowledgeStatus == AcknowledgmentStatus.NEGATIVE_ACK;
55+
}
56+
57+
public boolean isFinalAcknowledgmentForPartition() {
58+
return isFinalAcknowledgmentForPartition;
59+
}
60+
61+
public boolean isExpired(final Duration expiredDuration) {
62+
return Duration.between(Instant.ofEpochMilli(createTimestamp), Instant.now()).compareTo(expiredDuration) > 0;
63+
}
64+
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.dynamodb.stream;
12+
13+
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
14+
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
15+
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
16+
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
17+
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
18+
import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig;
19+
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition;
20+
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState;
21+
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import java.time.Duration;
26+
import java.time.Instant;
27+
import java.util.ArrayList;
28+
import java.util.Collections;
29+
import java.util.List;
30+
import java.util.Map;
31+
import java.util.Objects;
32+
import java.util.Optional;
33+
import java.util.concurrent.ConcurrentHashMap;
34+
import java.util.concurrent.ConcurrentLinkedQueue;
35+
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.Executors;
37+
import java.util.concurrent.TimeUnit;
38+
import java.util.function.Consumer;
39+
40+
import org.opensearch.dataprepper.plugins.source.dynamodb.model.ShardCheckpointStatus;
41+
42+
public class ShardAcknowledgementManager {
43+
private static final Logger LOG = LoggerFactory.getLogger(ShardAcknowledgementManager.class);
44+
45+
private static final String NULL_SEQUENCE_NUMBER = "null";
46+
47+
private static final long WAIT_FOR_ACKNOWLEDGMENTS_TIMEOUT = 10L;
48+
49+
static final Duration CHECKPOINT_INTERVAL = Duration.ofMinutes(2);
50+
51+
private final DynamoDBSourceConfig dynamoDBSourceConfig;
52+
private final Map<StreamPartition, ConcurrentLinkedQueue<ShardCheckpointStatus>> checkpoints = new ConcurrentHashMap<>();
53+
private final ConcurrentHashMap<StreamPartition, ConcurrentHashMap<String, ShardCheckpointStatus>> ackStatuses = new ConcurrentHashMap<>();
54+
55+
private final AcknowledgementSetManager acknowledgementSetManager;
56+
57+
private final EnhancedSourceCoordinator sourceCoordinator;
58+
59+
private final ExecutorService executorService;
60+
private final List<StreamPartition> partitionsToRemove;
61+
private final List<StreamPartition> partitionsToGiveUp;
62+
private boolean shutdownTriggered;
63+
64+
private Instant lastCheckpointTime;
65+
66+
public ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager,
67+
final EnhancedSourceCoordinator sourceCoordinator,
68+
final DynamoDBSourceConfig dynamoDBSourceConfig,
69+
final Consumer<StreamPartition> stopWorkerConsumer
70+
) {
71+
this.acknowledgementSetManager = acknowledgementSetManager;
72+
this.sourceCoordinator = sourceCoordinator;
73+
this.dynamoDBSourceConfig = dynamoDBSourceConfig;
74+
this.executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("dynamodb-shard-ack-monitor"));
75+
this.partitionsToRemove = Collections.synchronizedList(new ArrayList<>());
76+
this.partitionsToGiveUp = Collections.synchronizedList(new ArrayList<>());
77+
this.lastCheckpointTime = Instant.now();
78+
79+
executorService.submit(() -> monitorAcknowledgments(stopWorkerConsumer));
80+
}
81+
82+
void monitorAcknowledgments(final Consumer<StreamPartition> stopWorkerConsumer) {
83+
while (!Thread.currentThread().isInterrupted()) {
84+
boolean exit = runMonitorAcknowledgmentLoop(stopWorkerConsumer);
85+
if (exit) {
86+
break;
87+
}
88+
}
89+
90+
LOG.info("Exiting acknowledgment manager");
91+
}
92+
93+
boolean runMonitorAcknowledgmentLoop(final Consumer<StreamPartition> stopWorkerConsumer) {
94+
removePartitions();
95+
if (shutdownTriggered) {
96+
LOG.info("Shutdown was triggered giving up partitions and exiting cleanly");
97+
for (final StreamPartition streamPartition : checkpoints.keySet()) {
98+
sourceCoordinator.giveUpPartition(streamPartition);
99+
}
100+
return true;
101+
}
102+
103+
for (final StreamPartition streamPartition : checkpoints.keySet()) {
104+
try {
105+
final StreamProgressState streamProgressState = streamPartition.getProgressState().orElseThrow();
106+
final ConcurrentLinkedQueue<ShardCheckpointStatus> checkpointStatuses = checkpoints.get(streamPartition);
107+
ShardCheckpointStatus latestCheckpointForShard = null;
108+
boolean gaveUpPartition = false;
109+
while (!checkpointStatuses.isEmpty()) {
110+
updateOwnershipForAllShardPartitions();
111+
112+
if (checkpointStatuses.peek().isPositiveAcknowledgement()) {
113+
latestCheckpointForShard = checkpointStatuses.poll();
114+
} else if (checkpointStatuses.peek().isNegativeAcknowledgement()
115+
|| checkpointStatuses.peek().isExpired(dynamoDBSourceConfig.getShardAcknowledgmentTimeout())) {
116+
handleFailure(streamPartition, streamProgressState, latestCheckpointForShard);
117+
gaveUpPartition = true;
118+
119+
if (checkpointStatuses.peek().isNegativeAcknowledgement()) {
120+
LOG.warn("Received negative acknowledgment for partition {} with sequence number {}, giving up partition",
121+
streamPartition.getPartitionKey(), checkpointStatuses.peek().getSequenceNumber());
122+
} else {
123+
LOG.warn("Acknowledgment timed out for partition {} with sequence number {}, giving up partition",
124+
streamPartition.getPartitionKey(), checkpointStatuses.peek().getSequenceNumber());
125+
}
126+
127+
stopWorkerConsumer.accept(streamPartition);
128+
break;
129+
} else {
130+
break;
131+
}
132+
}
133+
134+
if (!gaveUpPartition) {
135+
updateOwnershipForAllShardPartitions();
136+
}
137+
138+
if (gaveUpPartition || latestCheckpointForShard == null) {
139+
continue;
140+
}
141+
142+
if (latestCheckpointForShard.isFinalAcknowledgmentForPartition()) {
143+
handleCompletedShard(streamPartition);
144+
} else {
145+
streamProgressState.setSequenceNumber(Objects.equals(latestCheckpointForShard.getSequenceNumber(), NULL_SEQUENCE_NUMBER) ? null : latestCheckpointForShard.getSequenceNumber());
146+
sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
147+
LOG.debug("Checkpointed shard {} with latest sequence number acknowledged {}", streamPartition.getShardId(), latestCheckpointForShard.getSequenceNumber());
148+
}
149+
if (partitionsToGiveUp.contains(streamPartition)) {
150+
partitionsToRemove.add(streamPartition);
151+
sourceCoordinator.giveUpPartition(streamPartition);
152+
}
153+
154+
} catch (final Exception e) {
155+
LOG.error("Received exception while monitoring acknowledgments for stream partition {}", streamPartition.getPartitionKey(), e);
156+
}
157+
}
158+
159+
return false;
160+
}
161+
162+
public AcknowledgementSet createAcknowledgmentSet(
163+
final StreamPartition streamPartition,
164+
final String sequenceNumber,
165+
final boolean isFinalSetForPartition) {
166+
final String sequenceNumberNoNull = sequenceNumber == null ? NULL_SEQUENCE_NUMBER : sequenceNumber;
167+
final ShardCheckpointStatus shardCheckpointStatus = new ShardCheckpointStatus(sequenceNumber, Instant.now().toEpochMilli(), isFinalSetForPartition);
168+
checkpoints.computeIfAbsent(streamPartition, segment -> new ConcurrentLinkedQueue<>()).add(shardCheckpointStatus);
169+
ackStatuses.computeIfAbsent(streamPartition, segment -> new ConcurrentHashMap<>());
170+
ackStatuses.get(streamPartition).put(sequenceNumberNoNull, shardCheckpointStatus);
171+
172+
return acknowledgementSetManager.create((result) -> {
173+
if (ackStatuses.containsKey(streamPartition) && ackStatuses.get(streamPartition).containsKey(sequenceNumberNoNull)) {
174+
final ShardCheckpointStatus ackCheckpointStatus = ackStatuses.get(streamPartition).get(sequenceNumberNoNull);
175+
176+
ackCheckpointStatus.setAcknowledgedTimestamp(Instant.now().toEpochMilli());
177+
178+
if (result) {
179+
LOG.debug("Received acknowledgment of completion from sink for partition {} with sequence number {}",
180+
streamPartition.getPartitionKey(), sequenceNumberNoNull);
181+
ackCheckpointStatus.setAcknowledged(ShardCheckpointStatus.AcknowledgmentStatus.POSITIVE_ACK);
182+
} else {
183+
LOG.warn("Negative acknowledgment received for partition {} with sequence number {}",
184+
streamPartition.getPartitionKey(), sequenceNumberNoNull);
185+
ackCheckpointStatus.setAcknowledged(ShardCheckpointStatus.AcknowledgmentStatus.NEGATIVE_ACK);
186+
}
187+
}
188+
}, dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
189+
}
190+
191+
void updateOwnershipForAllShardPartitions() {
192+
if (Duration.between(lastCheckpointTime, Instant.now()).compareTo(CHECKPOINT_INTERVAL) > 0) {
193+
for (final StreamPartition streamPartition : checkpoints.keySet()) {
194+
if (!partitionsToRemove.contains(streamPartition)) {
195+
sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
196+
}
197+
}
198+
199+
lastCheckpointTime = Instant.now();
200+
}
201+
}
202+
203+
private void handleFailure(final StreamPartition streamPartition,
204+
final StreamProgressState streamProgressState,
205+
final ShardCheckpointStatus latestCheckpointForShard) {
206+
if (latestCheckpointForShard != null) {
207+
streamProgressState.setSequenceNumber(latestCheckpointForShard.getSequenceNumber());
208+
sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
209+
}
210+
partitionsToRemove.add(streamPartition);
211+
sourceCoordinator.giveUpPartition(streamPartition);
212+
partitionsToGiveUp.remove(streamPartition);
213+
}
214+
215+
private void handleCompletedShard(final StreamPartition streamPartition) {
216+
sourceCoordinator.completePartition(streamPartition);
217+
partitionsToRemove.add(streamPartition);
218+
partitionsToGiveUp.remove(streamPartition);
219+
LOG.info("Received all acknowledgments for partition {}, marking partition as completed", streamPartition.getPartitionKey());
220+
}
221+
222+
public void shutdown() {
223+
shutdownTriggered = true;
224+
executorService.shutdown();
225+
try {
226+
if (!executorService.awaitTermination(WAIT_FOR_ACKNOWLEDGMENTS_TIMEOUT, TimeUnit.MINUTES)) {
227+
executorService.shutdownNow();
228+
}
229+
} catch (InterruptedException e) {
230+
executorService.shutdownNow();
231+
Thread.currentThread().interrupt();
232+
}
233+
}
234+
235+
private void removePartitions() {
236+
partitionsToRemove.forEach(streamPartition -> {
237+
checkpoints.remove(streamPartition);
238+
ackStatuses.remove(streamPartition);
239+
});
240+
241+
partitionsToRemove.clear();
242+
}
243+
244+
public void giveUpPartition(final StreamPartition streamPartition) {
245+
if (!partitionsToGiveUp.contains(streamPartition)) {
246+
LOG.debug("Adding partition {} to give up list", streamPartition.getPartitionKey());
247+
partitionsToGiveUp.add(streamPartition);
248+
}
249+
}
250+
251+
public boolean isExportDone(StreamPartition streamPartition) {
252+
Optional<EnhancedSourcePartition> globalPartition = sourceCoordinator.getPartition(streamPartition.getStreamArn());
253+
return globalPartition.isPresent();
254+
}
255+
256+
public void startUpdatingOwnershipForShard(final StreamPartition streamPartition) {
257+
checkpoints.computeIfAbsent(streamPartition, segment -> new ConcurrentLinkedQueue<>());
258+
}
259+
}

0 commit comments

Comments
 (0)