Skip to content

Commit bb7de17

Browse files
author
Jonah Calvo
committed
Performance improvements of acknowledgements manager in DDB source
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
1 parent b8ffadb commit bb7de17

3 files changed

Lines changed: 22 additions & 16 deletions

File tree

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class ShardAcknowledgementManager {
4646

4747
private static final long WAIT_FOR_ACKNOWLEDGMENTS_TIMEOUT = 10L;
4848

49-
static final Duration CHECKPOINT_INTERVAL = Duration.ofMinutes(2);
49+
static final long CHECKPOINT_INTERVAL = Duration.ofMinutes(2).toMillis();
5050

5151
private final DynamoDBSourceConfig dynamoDBSourceConfig;
5252
private final Map<StreamPartition, ConcurrentLinkedQueue<ShardCheckpointStatus>> checkpoints = new ConcurrentHashMap<>();
@@ -61,7 +61,7 @@ public class ShardAcknowledgementManager {
6161
private final List<StreamPartition> partitionsToGiveUp;
6262
private boolean shutdownTriggered;
6363

64-
private Instant lastCheckpointTime;
64+
private long lastCheckpointTime;
6565

6666
public ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager,
6767
final EnhancedSourceCoordinator sourceCoordinator,
@@ -72,9 +72,9 @@ public ShardAcknowledgementManager(final AcknowledgementSetManager acknowledgeme
7272
this.sourceCoordinator = sourceCoordinator;
7373
this.dynamoDBSourceConfig = dynamoDBSourceConfig;
7474
this.executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("dynamodb-shard-ack-monitor"));
75-
this.partitionsToRemove = Collections.synchronizedList(new ArrayList<>());
75+
this.partitionsToRemove = new ArrayList<>();
7676
this.partitionsToGiveUp = Collections.synchronizedList(new ArrayList<>());
77-
this.lastCheckpointTime = Instant.now();
77+
this.lastCheckpointTime = System.currentTimeMillis();
7878

7979
executorService.submit(() -> monitorAcknowledgments(stopWorkerConsumer));
8080
}
@@ -85,6 +85,12 @@ void monitorAcknowledgments(final Consumer<StreamPartition> stopWorkerConsumer)
8585
if (exit) {
8686
break;
8787
}
88+
try {
89+
// Idle between loops to save on CPU
90+
Thread.sleep(300);
91+
} catch (InterruptedException e) {
92+
throw new RuntimeException(e);
93+
}
8894
}
8995

9096
LOG.info("Exiting acknowledgment manager");
@@ -189,14 +195,14 @@ public AcknowledgementSet createAcknowledgmentSet(
189195
}
190196

191197
void updateOwnershipForAllShardPartitions() {
192-
if (Duration.between(lastCheckpointTime, Instant.now()).compareTo(CHECKPOINT_INTERVAL) > 0) {
198+
if (System.currentTimeMillis() - lastCheckpointTime > CHECKPOINT_INTERVAL) {
193199
for (final StreamPartition streamPartition : checkpoints.keySet()) {
194200
if (!partitionsToRemove.contains(streamPartition)) {
195201
sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
196202
}
197203
}
198204

199-
lastCheckpointTime = Instant.now();
205+
lastCheckpointTime = System.currentTimeMillis();
200206
}
201207
}
202208

@@ -233,12 +239,13 @@ public void shutdown() {
233239
}
234240

235241
private void removePartitions() {
236-
partitionsToRemove.forEach(streamPartition -> {
242+
for (final StreamPartition streamPartition : partitionsToRemove) {
237243
checkpoints.remove(streamPartition);
238244
ackStatuses.remove(streamPartition);
239-
});
240-
241-
partitionsToRemove.clear();
245+
}
246+
if (!partitionsToRemove.isEmpty()){
247+
partitionsToRemove.clear();
248+
}
242249
}
243250

244251
public void giveUpPartition(final StreamPartition streamPartition) {

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -263,13 +263,12 @@ public void run() {
263263
LOG.debug("Reached end of shard");
264264
break;
265265
}
266-
267-
if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) {
268-
LOG.debug("{} records written to buffer for shard {}", recordsWrittenToBuffer, shardId);
269-
if (shardAcknowledgementManager == null) {
266+
if (shardAcknowledgementManager == null) {
267+
if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) {
268+
LOG.debug("{} records written to buffer for shard {}", recordsWrittenToBuffer, shardId);
270269
checkpointer.checkpoint(sequenceNumber);
270+
lastCheckpointTime = System.currentTimeMillis();
271271
}
272-
lastCheckpointTime = System.currentTimeMillis();
273272
}
274273

275274
GetRecordsResponse response = callGetRecords(shardIterator);

data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ void testUpdateOwnershipForAllShardPartitions() throws Exception {
107107

108108
// Set lastCheckpointTime to past to trigger checkpoint interval
109109
setField(ShardAcknowledgementManager.class, shardAcknowledgementManager,
110-
"lastCheckpointTime", Instant.now().minus(Duration.ofMinutes(5)));
110+
"lastCheckpointTime", System.currentTimeMillis() - Duration.ofMinutes(5).toMillis());
111111

112112
// Call updateOwnershipForAllShardPartitions directly
113113
shardAcknowledgementManager.updateOwnershipForAllShardPartitions();

0 commit comments

Comments
 (0)