Skip to content

Commit 2c66d59

Browse files
authored
Add acknowledgment checkpointing with fixes to previous issues (#6002)
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent ea11b96 commit 2c66d59

15 files changed

Lines changed: 1161 additions & 319 deletions

File tree

data-prepper-plugins/dynamodb-source/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ dependencies {
2323

2424
implementation project(path: ':data-prepper-plugins:aws-plugin-api')
2525
implementation project(path: ':data-prepper-plugins:buffer-common')
26+
implementation project(path: ':data-prepper-plugins:common')
2627

2728

2829
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
30+
testImplementation project(':data-prepper-test:test-common')
2931
}

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class DynamoDBSourceConfig {
3737
private boolean acknowledgments = false;
3838

3939
@JsonProperty("shard_acknowledgment_timeout")
40-
private Duration shardAcknowledgmentTimeout = Duration.ofMinutes(10);
40+
private Duration shardAcknowledgmentTimeout = Duration.ofMinutes(30);
4141

4242
@JsonProperty("s3_data_file_acknowledgment_timeout")
4343
private Duration dataFileAcknowledgmentTimeout = Duration.ofMinutes(15);

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/StreamPartition.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
1010
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState;
1111

12+
import java.util.Objects;
1213
import java.util.Optional;
1314

1415
public class StreamPartition extends EnhancedSourcePartition<StreamProgressState> {
@@ -61,4 +62,16 @@ public String getStreamArn() {
6162
public String getShardId() {
6263
return shardId;
6364
}
65+
66+
@Override
67+
public boolean equals(Object o) {
68+
if (o == null || getClass() != o.getClass()) return false;
69+
StreamPartition that = (StreamPartition) o;
70+
return Objects.equals(shardId, that.shardId);
71+
}
72+
73+
@Override
74+
public int hashCode() {
75+
return Objects.hashCode(shardId);
76+
}
6477
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.dynamodb.model;
12+
13+
import java.time.Duration;
14+
import java.time.Instant;
15+
16+
public class ShardCheckpointStatus {
17+
private final String sequenceNumber;
18+
19+
private final boolean isFinalAcknowledgmentForPartition;
20+
private AcknowledgmentStatus acknowledgeStatus;
21+
private final long createTimestamp;
22+
private Long acknowledgedTimestamp;
23+
24+
public enum AcknowledgmentStatus {
25+
POSITIVE_ACK,
26+
NEGATIVE_ACK,
27+
NO_ACK
28+
}
29+
30+
public ShardCheckpointStatus(final String sequenceNumber, final long createTimestamp, final boolean isFinalAcknowledgmentForPartition) {
31+
this.sequenceNumber = sequenceNumber;
32+
this.acknowledgeStatus = AcknowledgmentStatus.NO_ACK;
33+
this.createTimestamp = createTimestamp;
34+
this.isFinalAcknowledgmentForPartition = isFinalAcknowledgmentForPartition;
35+
}
36+
37+
public void setAcknowledgedTimestamp(final Long acknowledgedTimestamp) {
38+
this.acknowledgedTimestamp = acknowledgedTimestamp;
39+
}
40+
41+
public void setAcknowledged(final AcknowledgmentStatus acknowledgmentStatus) {
42+
this.acknowledgeStatus = acknowledgmentStatus;
43+
}
44+
45+
public String getSequenceNumber() {
46+
return sequenceNumber;
47+
}
48+
49+
public boolean isPositiveAcknowledgement() {
50+
return this.acknowledgeStatus == AcknowledgmentStatus.POSITIVE_ACK;
51+
}
52+
53+
public boolean isNegativeAcknowledgement() {
54+
return this.acknowledgeStatus == AcknowledgmentStatus.NEGATIVE_ACK;
55+
}
56+
57+
public boolean isFinalAcknowledgmentForPartition() {
58+
return isFinalAcknowledgmentForPartition;
59+
}
60+
61+
public boolean isExpired(final Duration expiredDuration) {
62+
return Duration.between(Instant.ofEpochMilli(createTimestamp), Instant.now()).compareTo(expiredDuration) > 0;
63+
}
64+
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.dynamodb.stream;
12+
13+
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
14+
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
15+
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
16+
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
17+
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
18+
import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig;
19+
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition;
20+
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState;
21+
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import java.time.Duration;
26+
import java.time.Instant;
27+
import java.util.ArrayList;
28+
import java.util.Collections;
29+
import java.util.List;
30+
import java.util.Map;
31+
import java.util.Objects;
32+
import java.util.Optional;
33+
import java.util.concurrent.ConcurrentHashMap;
34+
import java.util.concurrent.ConcurrentLinkedQueue;
35+
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.Executors;
37+
import java.util.concurrent.TimeUnit;
38+
import java.util.function.Consumer;
39+
40+
import org.opensearch.dataprepper.plugins.source.dynamodb.model.ShardCheckpointStatus;
41+
42+
public class ShardAcknowledgementManager {
43+
private static final Logger LOG = LoggerFactory.getLogger(ShardAcknowledgementManager.class);
44+
45+
private static final String NULL_SEQUENCE_NUMBER = "null";
46+
47+
private static final long WAIT_FOR_ACKNOWLEDGMENTS_TIMEOUT = 10L;
48+
49+
static final Duration CHECKPOINT_INTERVAL = Duration.ofMinutes(2);
50+
51+
private final DynamoDBSourceConfig dynamoDBSourceConfig;
52+
private final Map<StreamPartition, ConcurrentLinkedQueue<ShardCheckpointStatus>> checkpoints = new ConcurrentHashMap<>();
53+
private final ConcurrentHashMap<StreamPartition, ConcurrentHashMap<String, ShardCheckpointStatus>> ackStatuses = new ConcurrentHashMap<>();
54+
55+
private final AcknowledgementSetManager acknowledgementSetManager;
56+
57+
private final EnhancedSourceCoordinator sourceCoordinator;
58+
59+
private final ExecutorService executorService;
60+
private final List<StreamPartition> partitionsToRemove;
61+
private final List<StreamPartition> partitionsToGiveUp;
62+
private boolean shutdownTriggered;
63+
64+
private Instant lastCheckpointTime;
65+
66+
public ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager,
67+
final EnhancedSourceCoordinator sourceCoordinator,
68+
final DynamoDBSourceConfig dynamoDBSourceConfig,
69+
final Consumer<StreamPartition> stopWorkerConsumer
70+
) {
71+
this.acknowledgementSetManager = acknowledgementSetManager;
72+
this.sourceCoordinator = sourceCoordinator;
73+
this.dynamoDBSourceConfig = dynamoDBSourceConfig;
74+
this.executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("dynamodb-shard-ack-monitor"));
75+
this.partitionsToRemove = Collections.synchronizedList(new ArrayList<>());
76+
this.partitionsToGiveUp = Collections.synchronizedList(new ArrayList<>());
77+
this.lastCheckpointTime = Instant.now();
78+
79+
executorService.submit(() -> monitorAcknowledgments(stopWorkerConsumer));
80+
}
81+
82+
void monitorAcknowledgments(final Consumer<StreamPartition> stopWorkerConsumer) {
83+
while (!Thread.currentThread().isInterrupted()) {
84+
boolean exit = runMonitorAcknowledgmentLoop(stopWorkerConsumer);
85+
if (exit) {
86+
break;
87+
}
88+
89+
try {
90+
Thread.sleep(2000);
91+
} catch (InterruptedException e) {
92+
throw new RuntimeException(e);
93+
}
94+
}
95+
96+
LOG.info("Exiting acknowledgment manager");
97+
}
98+
99+
boolean runMonitorAcknowledgmentLoop(final Consumer<StreamPartition> stopWorkerConsumer) {
100+
removePartitions();
101+
if (shutdownTriggered) {
102+
LOG.info("Shutdown was triggered giving up partitions and exiting cleanly");
103+
for (final StreamPartition streamPartition : checkpoints.keySet()) {
104+
sourceCoordinator.giveUpPartition(streamPartition);
105+
}
106+
return true;
107+
}
108+
109+
for (final StreamPartition streamPartition : checkpoints.keySet()) {
110+
try {
111+
final StreamProgressState streamProgressState = streamPartition.getProgressState().orElseThrow();
112+
final ConcurrentLinkedQueue<ShardCheckpointStatus> checkpointStatuses = checkpoints.get(streamPartition);
113+
ShardCheckpointStatus latestCheckpointForShard = null;
114+
boolean gaveUpPartition = false;
115+
while (!checkpointStatuses.isEmpty()) {
116+
updateOwnershipForAllShardPartitions();
117+
118+
if (checkpointStatuses.peek().isPositiveAcknowledgement()) {
119+
latestCheckpointForShard = checkpointStatuses.poll();
120+
if (latestCheckpointForShard != null) {
121+
ackStatuses.get(streamPartition).remove(latestCheckpointForShard.getSequenceNumber());
122+
}
123+
} else if (checkpointStatuses.peek().isNegativeAcknowledgement()
124+
|| checkpointStatuses.peek().isExpired(dynamoDBSourceConfig.getShardAcknowledgmentTimeout())) {
125+
handleFailure(streamPartition, streamProgressState, latestCheckpointForShard);
126+
gaveUpPartition = true;
127+
128+
if (checkpointStatuses.peek().isNegativeAcknowledgement()) {
129+
LOG.warn("Received negative acknowledgment for partition {} with sequence number {}, giving up partition",
130+
streamPartition.getPartitionKey(), checkpointStatuses.peek().getSequenceNumber());
131+
} else {
132+
LOG.warn("Acknowledgment timed out for partition {} with sequence number {}, giving up partition",
133+
streamPartition.getPartitionKey(), checkpointStatuses.peek().getSequenceNumber());
134+
}
135+
break;
136+
} else {
137+
break;
138+
}
139+
}
140+
141+
if (!gaveUpPartition) {
142+
updateOwnershipForAllShardPartitions();
143+
}
144+
145+
if (gaveUpPartition || latestCheckpointForShard == null) {
146+
continue;
147+
}
148+
149+
if (latestCheckpointForShard.isFinalAcknowledgmentForPartition()) {
150+
handleCompletedShard(streamPartition);
151+
} else {
152+
streamProgressState.setSequenceNumber(Objects.equals(latestCheckpointForShard.getSequenceNumber(), NULL_SEQUENCE_NUMBER) ? null : latestCheckpointForShard.getSequenceNumber());
153+
sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
154+
LOG.debug("Checkpointed shard {} with latest sequence number acknowledged {}", streamPartition.getShardId(), latestCheckpointForShard.getSequenceNumber());
155+
}
156+
if (partitionsToGiveUp.contains(streamPartition)) {
157+
partitionsToRemove.add(streamPartition);
158+
sourceCoordinator.giveUpPartition(streamPartition);
159+
}
160+
161+
} catch (final Exception e) {
162+
LOG.error("Received exception while monitoring acknowledgments for stream partition {}", streamPartition.getPartitionKey(), e);
163+
}
164+
}
165+
166+
return false;
167+
}
168+
169+
public AcknowledgementSet createAcknowledgmentSet(
170+
final StreamPartition streamPartition,
171+
final String sequenceNumber,
172+
final boolean isFinalSetForPartition) {
173+
final String sequenceNumberNoNull = sequenceNumber == null ? NULL_SEQUENCE_NUMBER : sequenceNumber;
174+
final ShardCheckpointStatus shardCheckpointStatus = new ShardCheckpointStatus(sequenceNumber, Instant.now().toEpochMilli(), isFinalSetForPartition);
175+
checkpoints.computeIfAbsent(streamPartition, segment -> new ConcurrentLinkedQueue<>()).add(shardCheckpointStatus);
176+
ackStatuses.computeIfAbsent(streamPartition, segment -> new ConcurrentHashMap<>());
177+
ackStatuses.get(streamPartition).put(sequenceNumberNoNull, shardCheckpointStatus);
178+
179+
return acknowledgementSetManager.create((result) -> {
180+
if (ackStatuses.containsKey(streamPartition) && ackStatuses.get(streamPartition).containsKey(sequenceNumberNoNull)) {
181+
final ShardCheckpointStatus ackCheckpointStatus = ackStatuses.get(streamPartition).get(sequenceNumberNoNull);
182+
183+
ackCheckpointStatus.setAcknowledgedTimestamp(Instant.now().toEpochMilli());
184+
185+
if (result) {
186+
LOG.debug("Received acknowledgment of completion from sink for partition {} with sequence number {}",
187+
streamPartition.getPartitionKey(), sequenceNumberNoNull);
188+
ackCheckpointStatus.setAcknowledged(ShardCheckpointStatus.AcknowledgmentStatus.POSITIVE_ACK);
189+
} else {
190+
LOG.warn("Negative acknowledgment received for partition {} with sequence number {}",
191+
streamPartition.getPartitionKey(), sequenceNumberNoNull);
192+
ackCheckpointStatus.setAcknowledged(ShardCheckpointStatus.AcknowledgmentStatus.NEGATIVE_ACK);
193+
}
194+
}
195+
}, dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
196+
}
197+
198+
void updateOwnershipForAllShardPartitions() {
199+
if (Duration.between(lastCheckpointTime, Instant.now()).compareTo(CHECKPOINT_INTERVAL) > 0) {
200+
for (final StreamPartition streamPartition : checkpoints.keySet()) {
201+
if (!partitionsToRemove.contains(streamPartition)) {
202+
sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
203+
}
204+
}
205+
206+
lastCheckpointTime = Instant.now();
207+
}
208+
}
209+
210+
private void handleFailure(final StreamPartition streamPartition,
211+
final StreamProgressState streamProgressState,
212+
final ShardCheckpointStatus latestCheckpointForShard) {
213+
if (latestCheckpointForShard != null) {
214+
streamProgressState.setSequenceNumber(latestCheckpointForShard.getSequenceNumber());
215+
sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
216+
}
217+
partitionsToRemove.add(streamPartition);
218+
sourceCoordinator.giveUpPartition(streamPartition);
219+
partitionsToGiveUp.remove(streamPartition);
220+
}
221+
222+
private void handleCompletedShard(final StreamPartition streamPartition) {
223+
sourceCoordinator.completePartition(streamPartition);
224+
partitionsToRemove.add(streamPartition);
225+
partitionsToGiveUp.remove(streamPartition);
226+
LOG.info("Received all acknowledgments for partition {}, marking partition as completed", streamPartition.getPartitionKey());
227+
}
228+
229+
public void shutdown() {
230+
shutdownTriggered = true;
231+
executorService.shutdown();
232+
try {
233+
if (!executorService.awaitTermination(WAIT_FOR_ACKNOWLEDGMENTS_TIMEOUT, TimeUnit.MINUTES)) {
234+
executorService.shutdownNow();
235+
}
236+
} catch (InterruptedException e) {
237+
executorService.shutdownNow();
238+
Thread.currentThread().interrupt();
239+
}
240+
}
241+
242+
private void removePartitions() {
243+
partitionsToRemove.forEach(streamPartition -> {
244+
checkpoints.remove(streamPartition);
245+
ackStatuses.remove(streamPartition);
246+
});
247+
248+
partitionsToRemove.clear();
249+
}
250+
251+
public void giveUpPartition(final StreamPartition streamPartition) {
252+
if (!partitionsToGiveUp.contains(streamPartition)) {
253+
LOG.debug("Adding partition {} to give up list", streamPartition.getPartitionKey());
254+
partitionsToGiveUp.add(streamPartition);
255+
}
256+
}
257+
258+
public boolean isExportDone(StreamPartition streamPartition) {
259+
Optional<EnhancedSourcePartition> globalPartition = sourceCoordinator.getPartition(streamPartition.getStreamArn());
260+
return globalPartition.isPresent();
261+
}
262+
263+
public void startUpdatingOwnershipForShard(final StreamPartition streamPartition) {
264+
checkpoints.computeIfAbsent(streamPartition, segment -> new ConcurrentLinkedQueue<>());
265+
}
266+
}

0 commit comments

Comments
 (0)