Skip to content

Commit bf3aa85

Browse files
committed
Catch when no object exists and mark as completed in s3 scan
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent 89e8f39 commit bf3aa85

2 files changed

Lines changed: 26 additions & 1 deletion

File tree

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.slf4j.LoggerFactory;
2121
import software.amazon.awssdk.services.s3.S3Client;
2222
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
23+
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
2324

2425
import java.io.IOException;
2526
import java.time.Duration;
@@ -167,6 +168,9 @@ private void startProcessingObject(final int waitTimeMillis) {
167168
sourceCoordinator.completePartition(objectToProcess.get().getPartitionKey());
168169
deleteObjectRequest.ifPresent(s3ObjectDeleteWorker::deleteS3Object);
169170
}
171+
} catch (final NoSuchKeyException e) {
172+
LOG.warn("Object {} from bucket {} could not be found, marking this object as complete and continuing processing", objectKey, bucket);
173+
sourceCoordinator.completePartition(objectToProcess.get().getPartitionKey());
170174
} catch (final PartitionNotOwnedException | PartitionNotFoundException | PartitionUpdateException e) {
171175
LOG.warn("S3 scan object worker received an exception from the source coordinator. There is a potential for duplicate data from {}, giving up partition and getting next partition: {}", objectKey, e.getMessage());
172176
sourceCoordinator.giveUpPartitions();

data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerTest.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider;
2828
import software.amazon.awssdk.services.s3.S3Client;
2929
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
30+
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
3031

3132
import java.io.IOException;
3233
import java.time.Duration;
@@ -47,10 +48,10 @@
4748
import static org.mockito.Mockito.doAnswer;
4849
import static org.mockito.Mockito.doNothing;
4950
import static org.mockito.Mockito.doThrow;
50-
import static org.mockito.Mockito.when;
5151
import static org.mockito.Mockito.verify;
5252
import static org.mockito.Mockito.verifyNoInteractions;
5353
import static org.mockito.Mockito.verifyNoMoreInteractions;
54+
import static org.mockito.Mockito.when;
5455
import static org.opensearch.dataprepper.plugins.source.ScanObjectWorker.ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME;
5556

5657
@ExtendWith(MockitoExtension.class)
@@ -275,6 +276,26 @@ void getNextPartition_supplier_is_expected_partitionCreationSupplier() {
275276
objectUnderTest.runWithoutInfiniteLoop();
276277
}
277278

279+
@Test
280+
void partitionIsCompleted_when_NoObjectKeyException_is_thrown_from_process_object() throws IOException {
281+
final String bucket = UUID.randomUUID().toString();
282+
final String objectKey = UUID.randomUUID().toString();
283+
final String partitionKey = bucket + "|" + objectKey;
284+
285+
286+
final SourcePartition<S3SourceProgressState> partitionToProcess = SourcePartition.builder(S3SourceProgressState.class).withPartitionKey(partitionKey).build();
287+
288+
given(sourceCoordinator.getNextPartition(any(Function.class))).willReturn(Optional.of(partitionToProcess));
289+
290+
final ArgumentCaptor<S3ObjectReference> objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class);
291+
doThrow(NoSuchKeyException.class).when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(null), eq(sourceCoordinator), eq(partitionKey));
292+
doNothing().when(sourceCoordinator).completePartition(partitionKey);
293+
294+
createObjectUnderTest().runWithoutInfiniteLoop();
295+
296+
verifyNoMoreInteractions(sourceCoordinator);
297+
}
298+
278299
static Stream<Class> exceptionProvider() {
279300
return Stream.of(PartitionUpdateException.class, PartitionNotFoundException.class, PartitionNotOwnedException.class);
280301
}

0 commit comments

Comments
 (0)