2929import static org .junit .jupiter .api .Assertions .assertEquals ;
3030import static org .junit .jupiter .api .Assertions .assertNotEquals ;
3131import static org .junit .jupiter .api .Assertions .assertNotNull ;
32- import static org .junit .jupiter .api .Assertions .assertTrue ;
32+ import static software .amazon .awssdk .services .s3 .internal .multipart .utils .MultipartDownloadTestUtils .internalErrorBody ;
33+ import static software .amazon .awssdk .services .s3 .internal .multipart .utils .MultipartDownloadTestUtils .slowdownErrorBody ;
3334
3435import com .github .tomakehurst .wiremock .http .Fault ;
3536import com .github .tomakehurst .wiremock .junit5 .WireMockRuntimeInfo ;
3637import com .github .tomakehurst .wiremock .junit5 .WireMockTest ;
3738import com .github .tomakehurst .wiremock .stubbing .Scenario ;
3839import java .net .URI ;
39- import java .nio .ByteBuffer ;
4040import java .nio .charset .StandardCharsets ;
4141import java .time .Duration ;
4242import java .util .ArrayList ;
4646import java .util .concurrent .CompletableFuture ;
4747import java .util .concurrent .CompletionException ;
4848import java .util .concurrent .TimeUnit ;
49- import java .util .concurrent .atomic .AtomicBoolean ;
5049import org .junit .jupiter .api .BeforeAll ;
5150import org .junit .jupiter .api .BeforeEach ;
5251import org .junit .jupiter .api .Test ;
5352import org .junit .jupiter .api .Timeout ;
54- import org .reactivestreams .Subscriber ;
55- import org .reactivestreams .Subscription ;
5653import software .amazon .awssdk .auth .credentials .AwsBasicCredentials ;
5754import software .amazon .awssdk .auth .credentials .StaticCredentialsProvider ;
5855import software .amazon .awssdk .awscore .retry .AwsRetryStrategy ;
5956import software .amazon .awssdk .core .ResponseBytes ;
6057import software .amazon .awssdk .core .async .AsyncResponseTransformer ;
61- import software .amazon .awssdk .core .async .SdkPublisher ;
6258import software .amazon .awssdk .core .exception .SdkClientException ;
6359import software .amazon .awssdk .core .interceptor .Context ;
6460import software .amazon .awssdk .core .interceptor .ExecutionAttributes ;
@@ -213,23 +209,6 @@ public void getObject_5xxErrorResponses_shouldNotReuseInitialRequestId() {
213209 verify (0 , getRequestedFor (urlEqualTo (String .format ("/%s/%s?partNumber=3" , BUCKET , KEY ))));
214210 }
215211
216- @ Test
217- public void getObject_ioException_shouldRetryAndFail () {
218- String firstRequestId = UUID .randomUUID ().toString ();
219- String secondRequestId = UUID .randomUUID ().toString ();
220-
221- stubIoError (1 );
222- assertThatThrownBy (() -> multipartClient .getObject (b -> b .bucket (BUCKET ).key (KEY ),
223- AsyncResponseTransformer .toBytes ()).join ())
224- .isInstanceOf (CompletionException .class )
225- .hasCauseInstanceOf (SdkClientException .class ).hasMessageContaining ("The connection was closed" )
226- .hasStackTraceContaining ("Error encountered during GetObjectRequest" );
227-
228- verify (MAX_ATTEMPTS , getRequestedFor (urlEqualTo (String .format ("/%s/%s?partNumber=1" , BUCKET , KEY ))));
229- verify (0 , getRequestedFor (urlEqualTo (String .format ("/%s/%s?partNumber=2" , BUCKET , KEY ))));
230- verify (0 , getRequestedFor (urlEqualTo (String .format ("/%s/%s?partNumber=3" , BUCKET , KEY ))));
231- }
232-
233212
234213 @ Test
235214 public void multipartDownload_200Response_shouldSucceed () {
@@ -254,16 +233,14 @@ public void multipartDownload_200Response_shouldSucceed() {
254233 @ Test
255234 public void multipartDownload_secondPartNonRetryableError_shouldFail () {
256235 stub200SuccessPart1 ();
257- stubError (2 , errorBody ( String . valueOf ( 500 ), "Internal Error" ));
236+ stubError (2 , internalErrorBody ( ));
258237
259238 CompletableFuture <ResponseBytes <GetObjectResponse >> future =
260239 multipartClient .getObject (GetObjectRequest .builder ().bucket (BUCKET ).key (KEY ).build (),
261240 AsyncResponseTransformer .toBytes ());
262241
263- assertThatThrownBy (() -> future .join ()).hasCauseInstanceOf (S3Exception .class )
264- .hasStackTraceContaining ("Error encountered "
265- + "during "
266- + "GetObjectRequest" );
242+ assertThatThrownBy (future ::join ).hasCauseInstanceOf (S3Exception .class )
243+ .hasMessageContaining ("We encountered an internal error. Please try again. (Service: S3, Status Code: 500" );
267244
268245 verify (1 , getRequestedFor (urlEqualTo (String .format ("/%s/%s?partNumber=1" , BUCKET , KEY ))));
269246 verify (MAX_ATTEMPTS , getRequestedFor (urlEqualTo (String .format ("/%s/%s?partNumber=2" , BUCKET , KEY ))));
@@ -369,7 +346,22 @@ public void multipartDownload_503OnFirstPartAndSecondPart_shouldRetrySuccessfull
369346 }
370347
371348 @ Test
372- public void getObject_iOError_shouldRetrySuccessfully () {
349+ public void getObject_ioExceptionOnly_shouldExhaustRetriesAndFail () {
350+ stubIoError (1 );
351+ stub200SuccessPart2 ();
352+ stub200SuccessPart3 ();
353+ assertThatThrownBy (() -> multipartClient .getObject (b -> b .bucket (BUCKET ).key (KEY ),
354+ AsyncResponseTransformer .toBytes ()).join ())
355+ .isInstanceOf (CompletionException .class )
356+ .hasCauseInstanceOf (SdkClientException .class );
357+
358+ verify (MAX_ATTEMPTS , getRequestedFor (urlEqualTo (String .format ("/%s/%s?partNumber=1" , BUCKET , KEY ))));
359+ verify (0 , getRequestedFor (urlEqualTo (String .format ("/%s/%s?partNumber=2" , BUCKET , KEY ))));
360+ verify (0 , getRequestedFor (urlEqualTo (String .format ("/%s/%s?partNumber=3" , BUCKET , KEY ))));
361+ }
362+
363+ @ Test
364+ public void getObject_iOErrorThen200Response_shouldRetrySuccessfully () {
373365 String requestId = UUID .randomUUID ().toString ();
374366
375367 stubFor (any (anyUrl ())
@@ -405,32 +397,16 @@ public void getObject_iOError_shouldRetrySuccessfully() {
405397 assertEquals (requestId , finalRequestId );
406398 }
407399
408- private String errorBody (String errorCode , String errorMessage ) {
409- return "<?xml version=\" 1.0\" encoding=\" UTF-8\" ?>\n "
410- + "<Error>\n "
411- + " <Code>" + errorCode + "</Code>\n "
412- + " <Message>" + errorMessage + "</Message>\n "
413- + "</Error>" ;
414- }
415-
416- private String internalErrorBody () {
417- return errorBody ("InternalError" , "We encountered an internal error. Please try again." );
418- }
419-
420- private String slowdownErrorBody () {
421- return errorBody ("SlowDown" , "Please reduce your request rate." );
422- }
423-
424400 private void stubError (int partNumber , String errorBody ) {
425- stubFor (get (urlEqualTo (String .format ("/%s/%s?partNumber=" + partNumber , BUCKET , KEY )))
401+ stubFor (get (urlEqualTo (String .format ("/%s/%s?partNumber=%d" , BUCKET , KEY , partNumber )))
426402 .willReturn (aResponse ()
427403 .withHeader ("x-amz-request-id" , String .valueOf (UUID .randomUUID ()))
428404 .withHeader ("x-amz-mp-parts-count" , String .valueOf (TOTAL_PARTS ))
429405 .withStatus (500 ).withBody (errorBody )));
430406 }
431407
432408 private void stubIoError (int partNumber ) {
433- stubFor (get (urlEqualTo (String .format ("/%s/%s?partNumber=" + partNumber , BUCKET , KEY )))
409+ stubFor (get (urlEqualTo (String .format ("/%s/%s?partNumber=%d" , BUCKET , KEY , partNumber )))
434410 .willReturn (aResponse ()
435411 .withFault (Fault .CONNECTION_RESET_BY_PEER )));
436412 }
@@ -520,62 +496,4 @@ public void clear() {
520496 responses .clear ();
521497 }
522498 }
523-
524- /**
525- * Custom AsyncResponseTransformer that simulates an error occurring after onStream() has been called
526- */
527- private static final class StreamingErrorTransformer
528- implements AsyncResponseTransformer <GetObjectResponse , ResponseBytes <GetObjectResponse >> {
529-
530- private final CompletableFuture <ResponseBytes <GetObjectResponse >> future = new CompletableFuture <>();
531- private final AtomicBoolean errorThrown = new AtomicBoolean ();
532- private final AtomicBoolean onStreamCalled = new AtomicBoolean ();
533-
534- @ Override
535- public CompletableFuture <ResponseBytes <GetObjectResponse >> prepare () {
536- return future ;
537- }
538-
539- @ Override
540- public void onResponse (GetObjectResponse response ) {
541- //
542- }
543-
544- @ Override
545- public void onStream (SdkPublisher <ByteBuffer > publisher ) {
546- onStreamCalled .set (true );
547- publisher .subscribe (new Subscriber <ByteBuffer >() {
548- private Subscription subscription ;
549-
550- @ Override
551- public void onSubscribe (Subscription s ) {
552- this .subscription = s ;
553- s .request (1 );
554- }
555-
556- @ Override
557- public void onNext (ByteBuffer byteBuffer ) {
558- if (errorThrown .compareAndSet (false , true )) {
559- future .completeExceptionally (new RuntimeException ());
560- subscription .cancel ();
561- }
562- }
563-
564- @ Override
565- public void onError (Throwable t ) {
566- future .completeExceptionally (t );
567- }
568-
569- @ Override
570- public void onComplete () {
571- //
572- }
573- });
574- }
575-
576- @ Override
577- public void exceptionOccurred (Throwable throwable ) {
578- future .completeExceptionally (throwable );
579- }
580- }
581499}
0 commit comments