Skip to content

Commit 85f6c0f

Browse files
authored
Fix SimplePublisher allowing re-subscription after cancellation (#6858)
1 parent fa80722 commit 85f6c0f

File tree

6 files changed

+211
-5
lines changed

6 files changed

+211
-5
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Fixed an issue where using a getObject ResponsePublisher as a putObject request body with the CRT HTTP client could cause the SDK to hang on retry when the server returns a retryable error."
6+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3.functionaltests;
17+
18+
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
19+
import static com.github.tomakehurst.wiremock.client.WireMock.put;
20+
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
21+
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
22+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
23+
24+
import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
25+
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
26+
import java.net.URI;
27+
import java.nio.ByteBuffer;
28+
import java.nio.charset.StandardCharsets;
29+
import java.util.Optional;
30+
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.TimeoutException;
33+
import org.junit.jupiter.api.AfterEach;
34+
import org.junit.jupiter.api.BeforeEach;
35+
import org.junit.jupiter.api.Test;
36+
import org.reactivestreams.Subscriber;
37+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
38+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
39+
import software.amazon.awssdk.core.async.AsyncRequestBody;
40+
import software.amazon.awssdk.core.exception.SdkClientException;
41+
import software.amazon.awssdk.regions.Region;
42+
import software.amazon.awssdk.services.s3.S3AsyncClient;
43+
import software.amazon.awssdk.utils.async.SimplePublisher;
44+
45+
/**
46+
* Tests that when a {@link SimplePublisher} is used as the putObject request body
47+
* and putObject fails with a retryable error, the SDK retry completes promptly instead of hanging.
48+
*/
49+
@WireMockTest
50+
public class PutObjectRequestBodyFromSimplePublisherRetryTest {
51+
52+
private static final String BUCKET = "test-bucket";
53+
private static final String WRITE_KEY = "dest-object";
54+
private static final byte[] BODY = "hello world test data".getBytes(StandardCharsets.UTF_8);
55+
56+
private S3AsyncClient s3Client;
57+
58+
@BeforeEach
59+
void setup(WireMockRuntimeInfo wiremock) {
60+
URI endpoint = URI.create("http://localhost:" + wiremock.getHttpPort());
61+
62+
s3Client = S3AsyncClient.builder()
63+
.region(Region.US_EAST_1)
64+
.endpointOverride(endpoint)
65+
.credentialsProvider(StaticCredentialsProvider.create(
66+
AwsBasicCredentials.create("key", "secret")))
67+
.forcePathStyle(true)
68+
.build();
69+
}
70+
71+
@AfterEach
72+
void tearDown() {
73+
s3Client.close();
74+
}
75+
76+
@Test
77+
void putObjectRetry_withSimplePublisherAsRequestBody_shouldFailFast() {
78+
stubFor(put(urlPathEqualTo("/" + BUCKET + "/" + WRITE_KEY))
79+
.willReturn(aResponse()
80+
.withStatus(500)
81+
.withBody("<Error><Code>InternalError</Code>"
82+
+ "<Message>Internal Server Error</Message></Error>")));
83+
84+
// Simulate a ResponsePublisher backed by SimplePublisher (e.g., from getObject)
85+
SimplePublisher<ByteBuffer> sourcePublisher = new SimplePublisher<>();
86+
sourcePublisher.send(ByteBuffer.wrap(BODY));
87+
sourcePublisher.complete();
88+
89+
AsyncRequestBody requestBody = new AsyncRequestBody() {
90+
@Override
91+
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
92+
sourcePublisher.subscribe(subscriber);
93+
}
94+
95+
@Override
96+
public Optional<Long> contentLength() {
97+
return Optional.of((long) BODY.length);
98+
}
99+
};
100+
101+
CompletableFuture<?> putFuture =
102+
s3Client.putObject(r -> r.bucket(BUCKET).key(WRITE_KEY), requestBody);
103+
104+
// Should fail fast, not hang for 30+ seconds.
105+
assertThatThrownBy(() -> putFuture.get(5, TimeUnit.SECONDS))
106+
.isNotInstanceOf(TimeoutException.class)
107+
.hasCauseInstanceOf(SdkClientException.class);
108+
}
109+
}

utils/src/main/java/software/amazon/awssdk/utils/async/AddingTrailingDataSubscriber.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ public void onSubscribe(Subscription subscription) {
7272
if (upstreamSubscription != null) {
7373
log.warn(() -> "Received duplicate subscription, cancelling the duplicate.", new IllegalStateException());
7474
subscription.cancel();
75+
onError(new IllegalStateException("Received duplicate subscription."));
7576
return;
7677
}
7778

utils/src/main/java/software/amazon/awssdk/utils/async/SimplePublisher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,9 @@ public void subscribe(Subscriber<? super T> s) {
199199
if (subscriber != null) {
200200
s.onSubscribe(new NoOpSubscription());
201201
s.onError(new IllegalStateException("Only one subscription may be active at a time."));
202+
return;
202203
}
204+
203205
this.subscriber = s;
204206
s.onSubscribe(new SubscriptionImpl());
205207
processEventQueue();
@@ -284,8 +286,6 @@ private void doProcessQueue() {
284286
break;
285287
case CANCEL:
286288
failureMessage.trySet(() -> new CancellationException("subscription has been cancelled."));
287-
288-
subscriber = null; // Allow subscriber to be garbage collected after cancellation.
289289
break;
290290
default:
291291
// Should never happen. Famous last words?

utils/src/test/java/software/amazon/awssdk/utils/async/AddingTrailingDataSubscriberTest.java

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
1919
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20-
21-
import com.google.common.collect.Lists;
2220
import java.util.ArrayList;
2321
import java.util.Arrays;
2422
import java.util.List;
2523
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.atomic.AtomicReference;
2625
import org.junit.jupiter.api.Test;
2726
import org.reactivestreams.Subscriber;
27+
import org.reactivestreams.Subscription;
2828

2929
public class AddingTrailingDataSubscriberTest {
3030

@@ -96,4 +96,54 @@ private void publishData(Subscriber<Integer> subscriber) {
9696
}
9797
simplePublisher.complete();
9898
}
99+
100+
@Test
101+
void duplicateOnSubscribe_shouldPropagateErrorToDownstream() {
102+
AtomicReference<Throwable> downstreamError = new AtomicReference<>();
103+
CompletableFuture<Void> future = new CompletableFuture<>();
104+
105+
Subscriber<Integer> downstream = new Subscriber<Integer>() {
106+
@Override
107+
public void onSubscribe(Subscription s) {
108+
s.request(Long.MAX_VALUE);
109+
}
110+
111+
@Override
112+
public void onNext(Integer item) {
113+
}
114+
115+
@Override
116+
public void onError(Throwable t) {
117+
downstreamError.set(t);
118+
future.completeExceptionally(t);
119+
}
120+
121+
@Override
122+
public void onComplete() {
123+
future.complete(null);
124+
}
125+
};
126+
127+
AddingTrailingDataSubscriber<Integer> subscriber =
128+
new AddingTrailingDataSubscriber<>(downstream, ArrayList::new);
129+
130+
// First subscription
131+
Subscription firstSub = new Subscription() {
132+
@Override
133+
public void request(long n) {
134+
}
135+
136+
@Override
137+
public void cancel() {
138+
}
139+
};
140+
subscriber.onSubscribe(firstSub);
141+
142+
// Duplicate subscription should propagate error to downstream
143+
subscriber.onSubscribe(firstSub);
144+
145+
assertThat(downstreamError.get())
146+
.isInstanceOf(IllegalStateException.class)
147+
.hasMessageContaining("Received duplicate subscription");
148+
}
99149
}

utils/src/test/java/software/amazon/awssdk/utils/async/SimplePublisherTest.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -563,4 +563,44 @@ public void cancel() {
563563
}
564564
}
565565

566-
}
566+
@Test
567+
public void subscribeAfterCancel_rejectsWithError() {
568+
SimplePublisher<Integer> publisher = new SimplePublisher<>();
569+
ControllableSubscriber<Integer> subscriber = new ControllableSubscriber<>();
570+
publisher.subscribe(subscriber);
571+
572+
subscriber.subscription.cancel();
573+
574+
// Second subscriber after cancel should get onError immediately
575+
StoringSubscriber<Integer> secondSubscriber = new StoringSubscriber<>(1);
576+
publisher.subscribe(secondSubscriber);
577+
578+
assertThat(secondSubscriber.peek().get().type()).isEqualTo(EventType.ON_ERROR);
579+
assertThat(secondSubscriber.peek().get().runtimeError())
580+
.isInstanceOf(IllegalStateException.class)
581+
.hasMessageContaining("Only one subscription may be active");
582+
}
583+
584+
@Test
585+
public void subscribeAfterCancelAndDataConsumed_rejectsWithError() {
586+
SimplePublisher<Integer> publisher = new SimplePublisher<>();
587+
StoringSubscriber<Integer> subscriber = new StoringSubscriber<>(Integer.MAX_VALUE);
588+
publisher.subscribe(subscriber);
589+
590+
publisher.send(1);
591+
publisher.send(2);
592+
publisher.complete();
593+
594+
while (subscriber.poll().isPresent()) {
595+
// drain
596+
}
597+
598+
// Now re-subscribe — should be rejected immediately
599+
StoringSubscriber<Integer> secondSubscriber = new StoringSubscriber<>(1);
600+
publisher.subscribe(secondSubscriber);
601+
602+
assertThat(secondSubscriber.peek().get().type()).isEqualTo(EventType.ON_ERROR);
603+
assertThat(secondSubscriber.peek().get().runtimeError())
604+
.isInstanceOf(IllegalStateException.class);
605+
}
606+
}

0 commit comments

Comments
 (0)