Skip to content

Commit 409c818

Browse files
authored
Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com> (opensearch-project#5621)
Adding retries and backoff to handle delayed sync between GSI and primary table when DDB is used Source Coordination Store
1 parent 0ba7ab2 commit 409c818

4 files changed

Lines changed: 46 additions & 8 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,17 @@ public interface SourceCoordinator<T> {
130130
void giveUpPartition(final String partitionKey, final Instant priorityTimestamp);
131131

132132

133+
/**
134+
* Should be called by the source when it is shutting down to indicate that it will no longer be able to perform work on partitions,
135+
* or can be called to give up ownership of its partitions in order to pick up new ones with {@link #getNextPartition(Function)} ()}.
136+
* @param partitionKey - Key used as the partition key.
137+
* @param priorityTimestamp - A timestamp that will determine the order that UNASSIGNED partitions are acquired after they are given up.
138+
* @param maxRetries - The number of times to retry giving up the partition before throwing an exception
139+
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException if the partition could not be given up due to some failure
140+
* @since 2.12
141+
*/
142+
void giveUpPartition(final String partitionKey, final Instant priorityTimestamp,final Integer maxRetries);
143+
133144
/**
134145
* Should be called by the source after when acknowledgments are enabled to keep ownership of the partition for acknowledgmentTimeout amount of time
135146
* before another instance of Data Prepper can pick it up for processing. Allows the source to acquire another partition immediately for processing

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,31 @@ public void giveUpPartition(final String partitionKey, final Instant priorityTim
351351
giveUpPartitionInternal(partitionKey, priorityTimestamp);
352352
}
353353

354+
@Override
355+
public void giveUpPartition(final String partitionKey, final Instant priorityTimestamp,final Integer maxRetries) {
356+
final long backoffMillis = 200;
357+
for (int attempt = 1; attempt <= maxRetries; attempt++) {
358+
try {
359+
giveUpPartitionInternal(partitionKey, priorityTimestamp);
360+
return;
361+
} catch (PartitionNotOwnedException e) {
362+
if (attempt == maxRetries) {
363+
throw e;
364+
}
365+
LOG.warn("Partition {} not owned on attempt {}/{}. Will retry giving up the partition", partitionKey,
366+
attempt, maxRetries);
367+
}
368+
369+
try {
370+
Thread.sleep(backoffMillis);
371+
} catch (InterruptedException ie) {
372+
Thread.currentThread().interrupt();
373+
LOG.warn("Interrupted while retrying giveUpPartition for partition {} with message {}", partitionKey, ie);
374+
throw new RuntimeException(ie);
375+
}
376+
}
377+
}
378+
354379
private void giveUpPartitionInternal(final String partitionKey, final Instant priorityTimestamp) {
355380
if (!initialized) {
356381
return;

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class ScanObjectWorker implements Runnable {
5656

5757
private static final Logger LOG = LoggerFactory.getLogger(ScanObjectWorker.class);
5858
private static final Integer MAX_OBJECTS_PER_ACKNOWLEDGMENT_SET = 1;
59-
59+
static final Integer MAX_RETRIES = 5;
6060
static final Duration CHECKPOINT_OWNERSHIP_INTERVAL = Duration.ofMinutes(2);
6161

6262
static final Duration NO_OBJECTS_FOUND_BEFORE_PARTITION_DELETION_DURATION = Duration.ofHours(1);
@@ -215,7 +215,7 @@ private void startProcessingObject(final long waitTimeMillis) {
215215
processFolderPartition(objectToProcess.get());
216216
} catch (final Exception e) {
217217
LOG.error("An exception occurred while processing folder partition {}, giving up this partition", objectToProcess.get().getPartitionKey(), e);
218-
sourceCoordinator.giveUpPartition(objectToProcess.get().getPartitionKey(), Instant.now());
218+
sourceCoordinator.giveUpPartition(objectToProcess.get().getPartitionKey(), Instant.now(), MAX_RETRIES);
219219
partitionKeys.remove(objectToProcess.get().getPartitionKey());
220220
}
221221
return;
@@ -323,7 +323,7 @@ private void processFolderPartition(final SourcePartition<S3SourceProgressState>
323323
return;
324324
}
325325
LOG.debug("No objects to process, giving up partition");
326-
sourceCoordinator.giveUpPartition(folderPartition.getPartitionKey(), Instant.now());
326+
sourceCoordinator.giveUpPartition(folderPartition.getPartitionKey(), Instant.now(), MAX_RETRIES);
327327
return;
328328
}
329329

@@ -441,7 +441,7 @@ private AcknowledgementSet createAcknowledgmentSetForFolderPartition(final Sourc
441441
objectsToDeleteForAcknowledgmentSets.remove(acknowledgmentSetId);
442442
partitionKeys.remove(folderPartition.getPartitionKey());
443443
LOG.info("Received all acknowledgments for folder partition {}, giving up this partition", folderPartition.getPartitionKey());
444-
sourceCoordinator.giveUpPartition(folderPartition.getPartitionKey(), Instant.now());
444+
sourceCoordinator.giveUpPartition(folderPartition.getPartitionKey(), Instant.now(), MAX_RETRIES);
445445
}
446446
}, acknowledgmentSetTimeout);
447447
}

data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import static org.opensearch.dataprepper.model.source.s3.S3ScanEnvironmentVariables.STOP_S3_SCAN_PROCESSING_PROPERTY;
7575
import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME;
7676
import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.CHECKPOINT_OWNERSHIP_INTERVAL;
77+
import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.MAX_RETRIES;
7778
import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.NO_OBJECTS_FOUND_BEFORE_PARTITION_DELETION_DURATION;
7879
import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.NO_OBJECTS_FOUND_FOR_FOLDER_PARTITION;
7980
import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.PARTITION_OWNERSHIP_UPDATE_ERRORS;
@@ -523,14 +524,15 @@ void processing_with_folder_partitions_with_no_objects_gives_up_that_partition()
523524
when(listObjectsV2Response.contents()).thenReturn(Collections.emptyList());
524525

525526
when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Response);
526-
doNothing().when(sourceCoordinator).giveUpPartition(eq(partitionKey), any(Instant.class));
527+
doNothing().when(sourceCoordinator).giveUpPartition(eq(partitionKey), any(Instant.class),
528+
eq(MAX_RETRIES));
527529
doNothing().when(sourceCoordinator).saveProgressStateForPartition(eq(partitionKey), any(S3SourceProgressState.class));
528530

529531
final ScanObjectWorker scanObjectWorker = createObjectUnderTest();
530532
scanObjectWorker.runWithoutInfiniteLoop();
531533

532534
verify(sourceCoordinator).saveProgressStateForPartition(eq(partitionKey), any(S3SourceProgressState.class));
533-
verify(sourceCoordinator).giveUpPartition(eq(partitionKey), any(Instant.class));
535+
verify(sourceCoordinator).giveUpPartition(eq(partitionKey), any(Instant.class), eq(MAX_RETRIES));
534536

535537
final ArgumentCaptor<ListObjectsV2Request> listObjectsV2RequestArgumentCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class);
536538

@@ -653,7 +655,7 @@ void processing_with_folder_partition_processes_objects_in_folder_and_deletes_th
653655
secondAckCallback.accept(true);
654656

655657
inOrder.verify(s3ObjectDeleteWorker).deleteS3Object(secondObjectDeleteRequest);
656-
inOrder.verify(sourceCoordinator).giveUpPartition(eq(partitionKey), any(Instant.class));
658+
inOrder.verify(sourceCoordinator).giveUpPartition(eq(partitionKey), any(Instant.class), eq(MAX_RETRIES));
657659
}
658660

659661
@Test
@@ -727,7 +729,7 @@ void processing_with_folder_partition_processes_objects_in_folder_until_max_obje
727729
ackCallback.accept(true);
728730

729731
inOrder.verify(s3ObjectDeleteWorker).deleteS3Object(firstObjectDeleteRequest);
730-
inOrder.verify(sourceCoordinator).giveUpPartition(eq(partitionKey), any(Instant.class));
732+
inOrder.verify(sourceCoordinator).giveUpPartition(eq(partitionKey), any(Instant.class), any(Integer.class));
731733
}
732734

733735
@Test

0 commit comments

Comments
 (0)