From f1af8b7d337cdf6ad346f97dca992e38038b049e Mon Sep 17 00:00:00 2001 From: RanVaknin <50976344+RanVaknin@users.noreply.github.com> Date: Fri, 17 Apr 2026 10:27:22 -0700 Subject: [PATCH 1/4] Cancel AsyncBufferingSubscriber upstream subscription when future is cancelled or fails --- .../s3/internal/AsyncBufferingSubscriber.java | 5 ++++- .../s3/internal/AsyncBufferingSubscriberTest.java | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java index 75231bd262d5..832163fc65e7 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java @@ -48,7 +48,7 @@ public AsyncBufferingSubscriber(Function> consumer, CompletableFuture returnFuture, int maxConcurrentExecutions) { this.returnFuture = returnFuture; - this.consumer = consumer; + this.consumer = consumer; this.maxConcurrentExecutions = maxConcurrentExecutions; this.numRequestsInFlight = new AtomicInteger(0); this.requestsInFlight = ConcurrentHashMap.newKeySet(); @@ -56,6 +56,9 @@ public AsyncBufferingSubscriber(Function> consumer, returnFuture.whenComplete((r, t) -> { if (t != null) { requestsInFlight.forEach(f -> f.cancel(true)); + if (subscription != null) { + subscription.cancel(); + } } }); } diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java index 3e64783561ec..4d9da2ceaff2 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java @@ -131,4 +131,18 @@ public void consumerFunctionThrows_shouldCancelSubscriptionAndCompleteFutureExce verify(mockSubscription, times(1)).cancel(); assertThatThrownBy(future::join).hasCause(exception); } + + @Test + void returnFutureCancelled_shouldCancelUpstreamSubscription() { + CompletableFuture future = new CompletableFuture<>(); + AsyncBufferingSubscriber subscriber = new AsyncBufferingSubscriber<>( + s -> new CompletableFuture<>(), future, 10); + + Subscription mockSubscription = mock(Subscription.class); + subscriber.onSubscribe(mockSubscription); + subscriber.onNext("item"); + + future.cancel(true); + verify(mockSubscription, times(1)).cancel(); + } } From 6b3058afd403d047c586599ebffed0ed7e8dc6ab Mon Sep 17 00:00:00 2001 From: RanVaknin <50976344+RanVaknin@users.noreply.github.com> Date: Fri, 17 Apr 2026 10:40:15 -0700 Subject: [PATCH 2/4] Add changelog --- .changes/next-release/bugfix-S3TransferManager-e28a164.json | 6 ++++++ .../transfer/s3/internal/AsyncBufferingSubscriber.java | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) create mode 100644 .changes/next-release/bugfix-S3TransferManager-e28a164.json diff --git a/.changes/next-release/bugfix-S3TransferManager-e28a164.json b/.changes/next-release/bugfix-S3TransferManager-e28a164.json new file mode 100644 index 000000000000..2faa7bc68b8d --- /dev/null +++ b/.changes/next-release/bugfix-S3TransferManager-e28a164.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "S3 Transfer Manager", + "contributor": "", + "description": "Fixed an issue where cancelling a directory transfer did not fully stop the operation." +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java index 832163fc65e7..64352787372f 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java @@ -48,7 +48,7 @@ public AsyncBufferingSubscriber(Function> consumer, CompletableFuture returnFuture, int maxConcurrentExecutions) { this.returnFuture = returnFuture; - this.consumer = consumer; + this.consumer = consumer; this.maxConcurrentExecutions = maxConcurrentExecutions; this.numRequestsInFlight = new AtomicInteger(0); this.requestsInFlight = ConcurrentHashMap.newKeySet(); From 9837072e69109a7cf198520c3b91eadda985c742 Mon Sep 17 00:00:00 2001 From: RanVaknin <50976344+RanVaknin@users.noreply.github.com> Date: Fri, 17 Apr 2026 15:10:08 -0700 Subject: [PATCH 3/4] Fix test --- .../transfer/s3/internal/AsyncBufferingSubscriberTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java index 4d9da2ceaff2..9e471c0a777b 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java @@ -128,7 +128,7 @@ public void consumerFunctionThrows_shouldCancelSubscriptionAndCompleteFutureExce subscriber.onSubscribe(mockSubscription); subscriber.onNext("item"); - verify(mockSubscription, times(1)).cancel(); + verify(mockSubscription, times(2)).cancel(); assertThatThrownBy(future::join).hasCause(exception); } From 9e746e9565fd75e5af36bb696a9dd23e2dad8002 Mon Sep 17 00:00:00 2001 From: RanVaknin <50976344+RanVaknin@users.noreply.github.com> Date: Fri, 17 Apr 2026 15:33:05 -0700 Subject: [PATCH 4/4] Update whenComplete to include a synchronization lock --- .../transfer/s3/internal/AsyncBufferingSubscriber.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java index 64352787372f..dae8976e4bf5 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java @@ -56,8 +56,10 @@ public AsyncBufferingSubscriber(Function> consumer, returnFuture.whenComplete((r, t) -> { if (t != null) { requestsInFlight.forEach(f -> f.cancel(true)); - if (subscription != null) { - subscription.cancel(); + synchronized (this) { + if (subscription != null) { + subscription.cancel(); + } } } });