diff --git a/.changes/next-release/bugfix-S3TransferManager-e28a164.json b/.changes/next-release/bugfix-S3TransferManager-e28a164.json new file mode 100644 index 000000000000..2faa7bc68b8d --- /dev/null +++ b/.changes/next-release/bugfix-S3TransferManager-e28a164.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "S3 Transfer Manager", + "contributor": "", + "description": "Fixed an issue where cancelling a directory transfer did not fully stop the operation." +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java index 75231bd262d5..dae8976e4bf5 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java @@ -56,6 +56,11 @@ public AsyncBufferingSubscriber(Function> consumer, returnFuture.whenComplete((r, t) -> { if (t != null) { requestsInFlight.forEach(f -> f.cancel(true)); + synchronized (this) { + if (subscription != null) { + subscription.cancel(); + } + } } }); } diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java index 3e64783561ec..9e471c0a777b 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java @@ -128,7 +128,21 @@ public void consumerFunctionThrows_shouldCancelSubscriptionAndCompleteFutureExce subscriber.onSubscribe(mockSubscription); subscriber.onNext("item"); - verify(mockSubscription, times(1)).cancel(); + verify(mockSubscription, times(2)).cancel(); assertThatThrownBy(future::join).hasCause(exception); } + + @Test + void returnFutureCancelled_shouldCancelUpstreamSubscription() { + CompletableFuture future = new CompletableFuture<>(); + AsyncBufferingSubscriber subscriber = new AsyncBufferingSubscriber<>( + s -> new CompletableFuture<>(), future, 10); + + Subscription mockSubscription = mock(Subscription.class); + subscriber.onSubscribe(mockSubscription); + subscriber.onNext("item"); + + future.cancel(true); + verify(mockSubscription, times(1)).cancel(); + } }