Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-S3TransferManager-e28a164.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "S3 Transfer Manager",
"contributor": "",
"description": "Fixed an issue where cancelling a directory transfer did not fully stop the operation."
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public AsyncBufferingSubscriber(Function<T, CompletableFuture<?>> consumer,
returnFuture.whenComplete((r, t) -> {
if (t != null) {
requestsInFlight.forEach(f -> f.cancel(true));
synchronized (this) {
if (subscription != null) {
subscription.cancel();
}
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,21 @@ public void consumerFunctionThrows_shouldCancelSubscriptionAndCompleteFutureExce
subscriber.onSubscribe(mockSubscription);
subscriber.onNext("item");

verify(mockSubscription, times(1)).cancel();
verify(mockSubscription, times(2)).cancel();
Copy link
Copy Markdown
Contributor Author

@RanVaknin RanVaknin Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscription.cancel() now exists in two codepaths:

  1. The existing codepath within onNext() catch block.
  2. The new addition in future.whenComplete()

Flow:

- onNext()
-- catch
-- subscription.cancel()
-- onError()

- onError()
-- completes exceptionally
-- future.whenCompleted()
-- subscription.cancel() // <-- new addition

I believe this is safe to call twice because:

5 Subscription.cancel MUST respect the responsivity of its caller by returning in a timely manner, MUST be idempotent and MUST be thread-safe.

Note: the codepath which the new addition solves is when the cancellation signal comes from outside of the pub/sub pipeline. If a user aborts the operation by cancelling the DirectoryDownload completion future directly, neither onNext nor onError are involved. The whenComplete handler is the only place that reacts to this external cancellation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth adding a quick comment here?

assertThatThrownBy(future::join).hasCause(exception);
}

@Test
void returnFutureCancelled_shouldCancelUpstreamSubscription() {
CompletableFuture<Void> future = new CompletableFuture<>();
AsyncBufferingSubscriber<String> subscriber = new AsyncBufferingSubscriber<>(
s -> new CompletableFuture<>(), future, 10);

Subscription mockSubscription = mock(Subscription.class);
subscriber.onSubscribe(mockSubscription);
subscriber.onNext("item");

future.cancel(true);
verify(mockSubscription, times(1)).cancel();
}
}
Loading