Skip to content

Commit c5ecaae

Browse files
committed
Add read failure metric to S3 source
Signed-off-by: Manuel Mangas Zurita <mzurita@amazon.com>
1 parent 1b4979c commit c5ecaae

6 files changed

Lines changed: 58 additions & 8 deletions

File tree

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectPluginMetrics.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ public class S3ObjectPluginMetrics {
1313
static final String S3_OBJECTS_SIZE_PROCESSED = "s3ObjectProcessedBytes";
1414
static final String S3_OBJECTS_FAILED_METRIC_NAME = "s3ObjectsFailed";
1515
static final String S3_OBJECTS_DELETE_FAILED_METRIC_NAME = "s3ObjectsDeleteFailed";
16+
static final String S3_OBJECTS_READ_FAILED_METRIC_NAME = "s3ObjectReadFailed";
1617
static final String S3_OBJECTS_SUCCEEDED_METRIC_NAME = "s3ObjectsSucceeded";
1718
static final String S3_OBJECTS_EVENTS = "s3ObjectsEvents";
1819
static final String S3_OBJECTS_FAILED_NOT_FOUND_METRIC_NAME = "s3ObjectsNotFound";
@@ -33,6 +34,7 @@ public class S3ObjectPluginMetrics {
3334
private final Counter s3ObjectNoRecordsFound;
3435

3536
private final Counter s3ObjectsDeleteFailed;
37+
private final Counter s3ObjectReadFailedCounter;
3638

3739
public S3ObjectPluginMetrics(final PluginMetrics pluginMetrics){
3840
s3ObjectsFailedCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_METRIC_NAME);
@@ -46,6 +48,7 @@ public S3ObjectPluginMetrics(final PluginMetrics pluginMetrics){
4648
s3ObjectEventsSummary = pluginMetrics.summary(S3_OBJECTS_EVENTS);
4749
s3ObjectNoRecordsFound = pluginMetrics.counter(S3_OBJECTS_NO_RECORDS_FOUND);
4850
s3ObjectsDeleteFailed = pluginMetrics.counter(S3_OBJECTS_DELETE_FAILED_METRIC_NAME);
51+
s3ObjectReadFailedCounter = pluginMetrics.counter(S3_OBJECTS_READ_FAILED_METRIC_NAME);
4952
}
5053

5154
public Counter getS3ObjectsFailedCounter() {
@@ -89,4 +92,6 @@ public Counter getS3ObjectsThrottledCounter() {
8992
}
9093

9194
public Counter getS3ObjectsDeleteFailed() { return s3ObjectsDeleteFailed; }
95+
96+
public Counter getS3ObjectReadFailedCounter() { return s3ObjectReadFailedCounter; }
9297
}

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,10 +155,15 @@ public long consumeS3Object(final S3InputFile inputFile, final S3DataSelection d
155155
final CompressionOption fileCompressionOption = compressionOption != CompressionOption.AUTOMATIC ?
156156
compressionOption : CompressionOption.fromFileName(s3ObjectReference.getKey());
157157

158-
codec.parse(inputFile, fileCompressionOption.getDecompressionEngine(), record -> {
159-
consumer.accept(record, dataSelection);
160-
});
161-
return inputFile.getLength();
158+
try {
159+
codec.parse(inputFile, fileCompressionOption.getDecompressionEngine(), record -> {
160+
consumer.accept(record, dataSelection);
161+
});
162+
return inputFile.getLength();
163+
} catch (final Exception e) {
164+
s3ObjectPluginMetrics.getS3ObjectReadFailedCounter().increment();
165+
throw new S3ReadFailedException(e);
166+
}
162167
}
163168
}
164169

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.s3;
12+
13+
/**
14+
* Exception thrown when there is a failure reading from S3 objects.
15+
* This is used to distinguish actual S3 read failures from other processing failures.
16+
*/
17+
public class S3ReadFailedException extends RuntimeException {
18+
public S3ReadFailedException(final Throwable cause) {
19+
super(cause);
20+
}
21+
}

data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectPluginMetricsTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,6 @@ public void s3ObjectPluginMetricsTest(){
4646
assertThat(metrics.getS3ObjectsFailedAccessDeniedCounter(),sameInstance(counter));
4747
assertThat(metrics.getS3ObjectsFailedNotFoundCounter(),sameInstance(counter));
4848
assertThat(metrics.getS3ObjectsThrottledCounter(),sameInstance(counter));
49+
assertThat(metrics.getS3ObjectReadFailedCounter(),sameInstance(counter));
4950
}
5051
}

data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerTest.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -289,11 +289,12 @@ void S3ObjectWorker_with_MetadataOnly_Test() throws Exception {
289289
void processS3Object_codec_parse_exception() throws Exception {
290290
when(s3Client.headObject(any(HeadObjectRequest.class))).thenReturn(headObjectResponse);
291291
when(s3ObjectPluginMetrics.getS3ObjectsFailedCounter()).thenReturn(s3ObjectsFailedCounter);
292+
when(s3ObjectPluginMetrics.getS3ObjectReadFailedCounter()).thenReturn(mock(Counter.class));
292293

293294
doThrow(IOException.class).when(codec).parse(any(InputFile.class), any(DecompressionEngine.class), any(Consumer.class));
294295

295296
assertThrows(
296-
IOException.class,
297+
S3ReadFailedException.class,
297298
() -> createObjectUnderTest(s3ObjectPluginMetrics).processS3Object(s3ObjectReference, S3DataSelection.DATA_AND_METADATA, acknowledgementSet, null, null));
298299

299300
final ArgumentCaptor<InputFile> inputFileArgumentCaptor = ArgumentCaptor.forClass(InputFile.class);
@@ -302,6 +303,21 @@ void processS3Object_codec_parse_exception() throws Exception {
302303
assertThat(actualInputFile, instanceOf(S3InputFile.class));
303304
}
304305

306+
@Test
307+
void processS3Object_increments_s3ObjectReadFailed_counter_when_codec_parse_fails() throws Exception {
308+
when(s3Client.headObject(any(HeadObjectRequest.class))).thenReturn(headObjectResponse);
309+
when(s3ObjectPluginMetrics.getS3ObjectsFailedCounter()).thenReturn(s3ObjectsFailedCounter);
310+
final Counter s3ObjectReadFailedCounter = mock(Counter.class);
311+
when(s3ObjectPluginMetrics.getS3ObjectReadFailedCounter()).thenReturn(s3ObjectReadFailedCounter);
312+
313+
doThrow(IOException.class).when(codec).parse(any(InputFile.class), any(DecompressionEngine.class), any(Consumer.class));
314+
315+
assertThrows(S3ReadFailedException.class,
316+
() -> createObjectUnderTest(s3ObjectPluginMetrics).processS3Object(s3ObjectReference, S3DataSelection.DATA_AND_METADATA, acknowledgementSet, null, null));
317+
318+
verify(s3ObjectReadFailedCounter, times(1)).increment();
319+
}
320+
305321
@Test
306322
void processS3Object_calls_Codec_parse_with_Consumer_that_adds_to_BufferAccumulator() throws Exception {
307323
when(s3Client.headObject(any(HeadObjectRequest.class))).thenReturn(headObjectResponse);
@@ -415,6 +431,7 @@ void processS3Object_increments_success_counter_after_parsing_S3_object() throws
415431
@Test
416432
void processS3Object_throws_Exception_and_increments_failure_counter_when_unable_to_parse_S3_object() throws IOException {
417433
when(s3ObjectPluginMetrics.getS3ObjectsFailedCounter()).thenReturn(s3ObjectsFailedCounter);
434+
when(s3ObjectPluginMetrics.getS3ObjectReadFailedCounter()).thenReturn(mock(Counter.class));
418435

419436
when(s3Client.headObject(any(HeadObjectRequest.class))).thenReturn(headObjectResponse);
420437

@@ -423,13 +440,13 @@ void processS3Object_throws_Exception_and_increments_failure_counter_when_unable
423440
.when(codec).parse(any(InputFile.class), any(DecompressionEngine.class), any(Consumer.class));
424441

425442
final S3ObjectHandler objectUnderTest = createObjectUnderTest(s3ObjectPluginMetrics);
426-
final IOException actualException = assertThrows(IOException.class, () -> objectUnderTest.processS3Object(s3ObjectReference, S3DataSelection.DATA_AND_METADATA, acknowledgementSet, null, null));
443+
final S3ReadFailedException actualException = assertThrows(S3ReadFailedException.class, () -> objectUnderTest.processS3Object(s3ObjectReference, S3DataSelection.DATA_AND_METADATA, acknowledgementSet, null, null));
427444

428-
assertThat(actualException, sameInstance(expectedException));
445+
assertThat(actualException.getCause(), sameInstance(expectedException));
429446

430447
verify(s3ObjectsFailedCounter).increment();
431448
verifyNoInteractions(s3ObjectsSucceededCounter);
432-
assertThat(exceptionThrownByCallable, sameInstance(expectedException));
449+
assertThat(exceptionThrownByCallable, sameInstance(actualException));
433450
}
434451

435452
@Test

data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3SelectObjectWorkerTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ void setup() {
152152
lenient().when(selectJsonOption.getType()).thenReturn(JSON_LINES_TYPE);
153153

154154
given(s3ObjectRequest.getS3ObjectPluginMetrics()).willReturn(s3ObjectPluginMetrics);
155+
lenient().when(s3ObjectPluginMetrics.getS3ObjectReadFailedCounter()).thenReturn(mock(Counter.class));
155156
bucketOwnerProvider = mock(BucketOwnerProvider.class);
156157
given(bucketOwnerProvider.getBucketOwner(any(String.class))).willReturn(Optional.of("my-bucket-1"));
157158
given(s3ObjectRequest.getBucketOwnerProvider()).willReturn(bucketOwnerProvider);

0 commit comments

Comments
 (0)