Skip to content

Commit 5d81f58

Browse files
authored
Merge branch 'master' into dongie/dont-copy-rules1
2 parents 5e06dd8 + 0e0d18f commit 5d81f58

10 files changed

Lines changed: 217 additions & 111 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Fix an issue where `StackOverflowError` can occur when iterating over large pages from an async paginator. This can manifest as the publisher hanging/never reaching the end of the stream. Fixes [#6411](https://github.com/aws/aws-sdk-java-v2/issues/6411)."
6+
}

codegen/src/main/java/software/amazon/awssdk/codegen/poet/paginators/AsyncResponseClassSpec.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,9 @@
2424
import com.squareup.javapoet.TypeSpec;
2525
import com.squareup.javapoet.WildcardTypeName;
2626
import java.util.Collections;
27-
import java.util.Iterator;
2827
import java.util.Objects;
2928
import java.util.concurrent.CompletableFuture;
3029
import java.util.function.Consumer;
31-
import java.util.function.Function;
3230
import java.util.stream.Collectors;
3331
import java.util.stream.Stream;
3432
import javax.lang.model.element.Modifier;
@@ -40,7 +38,6 @@
4038
import software.amazon.awssdk.codegen.poet.PoetUtils;
4139
import software.amazon.awssdk.core.async.SdkPublisher;
4240
import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher;
43-
import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher;
4441
import software.amazon.awssdk.core.pagination.async.ResponsesSubscription;
4542

4643
/**
@@ -200,21 +197,23 @@ private MethodSpec getMethodsSpecForSingleResultKey(String resultKey) {
200197
return null;
201198
}
202199

200+
String fluentGetter = fluentGetterMethodForResponseMember(resultKey);
201+
202+
CodeBlock.Builder iterableFnBuilder = CodeBlock.builder()
203+
.add("$1N -> $1N.$2L", RESPONSE_LITERAL, fluentGetter);
204+
205+
String fluentGetterMethodName = resultKeyModel.getFluentGetterMethodName();
206+
207+
if (resultKeyModel.isMap()) {
208+
iterableFnBuilder.add(".entrySet()");
209+
}
210+
203211
TypeName resultKeyType = getTypeForResultKey(resultKey);
204212

205-
return MethodSpec.methodBuilder(resultKeyModel.getFluentGetterMethodName())
213+
return MethodSpec.methodBuilder(fluentGetterMethodName)
206214
.addModifiers(Modifier.PUBLIC, Modifier.FINAL)
207215
.returns(ParameterizedTypeName.get(ClassName.get(SdkPublisher.class), resultKeyType))
208-
.addCode("$T getIterator = ",
209-
ParameterizedTypeName.get(ClassName.get(Function.class),
210-
responseType(),
211-
ParameterizedTypeName.get(ClassName.get(Iterator.class),
212-
resultKeyType)))
213-
.addCode(getIteratorLambdaBlock(resultKey, resultKeyModel))
214-
.addCode("\n")
215-
.addStatement("return $1T.builder().$2L(new $3L()).iteratorFunction(getIterator).$4L($4L).build()",
216-
PaginatedItemsPublisher.class, NEXT_PAGE_FETCHER_MEMBER, nextPageFetcherClassName(),
217-
LAST_PAGE_FIELD)
216+
.addStatement("return this.flatMapIterable($L)", iterableFnBuilder.build())
218217
.addJavadoc(CodeBlock.builder()
219218
.add("Returns a publisher that can be used to get a stream of data. You need to "
220219
+ "subscribe to the publisher to request the stream of data. The publisher "

codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithResultKeyAndMoreResultsPublisher.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
package software.amazon.awssdk.services.jsonprotocoltests.paginators;
22

3-
import java.util.Collections;
4-
import java.util.Iterator;
53
import java.util.concurrent.CompletableFuture;
6-
import java.util.function.Function;
74
import org.reactivestreams.Subscriber;
85
import software.amazon.awssdk.annotations.Generated;
96
import software.amazon.awssdk.core.async.SdkPublisher;
107
import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher;
11-
import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher;
128
import software.amazon.awssdk.core.pagination.async.ResponsesSubscription;
139
import software.amazon.awssdk.services.jsonprotocoltests.JsonProtocolTestsAsyncClient;
1410
import software.amazon.awssdk.services.jsonprotocoltests.internal.UserAgentUtils;
@@ -107,15 +103,7 @@ public void subscribe(Subscriber<? super PaginatedOperationWithResultKeyAndMoreR
107103
* and then applies that consumer to each response returned by the service.
108104
*/
109105
public final SdkPublisher<SimpleStruct> items() {
110-
Function<PaginatedOperationWithResultKeyAndMoreResultsResponse, Iterator<SimpleStruct>> getIterator = response -> {
111-
if (response != null && response.items() != null) {
112-
return response.items().iterator();
113-
}
114-
return Collections.emptyIterator();
115-
};
116-
return PaginatedItemsPublisher.builder()
117-
.nextPageFetcher(new PaginatedOperationWithResultKeyAndMoreResultsResponseFetcher())
118-
.iteratorFunction(getIterator).isLastPage(isLastPage).build();
106+
return this.flatMapIterable(response -> response.items());
119107
}
120108

121109
private class PaginatedOperationWithResultKeyAndMoreResultsResponseFetcher implements

codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithResultKeyPublisher.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
package software.amazon.awssdk.services.jsonprotocoltests.paginators;
22

3-
import java.util.Collections;
4-
import java.util.Iterator;
53
import java.util.concurrent.CompletableFuture;
6-
import java.util.function.Function;
74
import org.reactivestreams.Subscriber;
85
import software.amazon.awssdk.annotations.Generated;
96
import software.amazon.awssdk.core.async.SdkPublisher;
107
import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher;
11-
import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher;
128
import software.amazon.awssdk.core.pagination.async.ResponsesSubscription;
139
import software.amazon.awssdk.core.util.PaginatorUtils;
1410
import software.amazon.awssdk.services.jsonprotocoltests.JsonProtocolTestsAsyncClient;
@@ -107,14 +103,7 @@ public void subscribe(Subscriber<? super PaginatedOperationWithResultKeyResponse
107103
* and then applies that consumer to each response returned by the service.
108104
*/
109105
public final SdkPublisher<SimpleStruct> items() {
110-
Function<PaginatedOperationWithResultKeyResponse, Iterator<SimpleStruct>> getIterator = response -> {
111-
if (response != null && response.items() != null) {
112-
return response.items().iterator();
113-
}
114-
return Collections.emptyIterator();
115-
};
116-
return PaginatedItemsPublisher.builder().nextPageFetcher(new PaginatedOperationWithResultKeyResponseFetcher())
117-
.iteratorFunction(getIterator).isLastPage(isLastPage).build();
106+
return this.flatMapIterable(response -> response.items());
118107
}
119108

120109
private class PaginatedOperationWithResultKeyResponseFetcher implements

codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/customizations/SameTokenPaginationApiPublisher.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
package software.amazon.awssdk.services.jsonprotocoltests.paginators;
22

3-
import java.util.Collections;
4-
import java.util.Iterator;
53
import java.util.concurrent.CompletableFuture;
6-
import java.util.function.Function;
74
import org.reactivestreams.Subscriber;
85
import software.amazon.awssdk.annotations.Generated;
96
import software.amazon.awssdk.core.async.SdkPublisher;
107
import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher;
11-
import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher;
128
import software.amazon.awssdk.core.pagination.async.ResponsesSubscription;
139
import software.amazon.awssdk.core.util.PaginatorUtils;
1410
import software.amazon.awssdk.services.jsonprotocoltests.JsonProtocolTestsAsyncClient;
@@ -85,7 +81,7 @@ public SameTokenPaginationApiPublisher(JsonProtocolTestsAsyncClient client, Same
8581
}
8682

8783
private SameTokenPaginationApiPublisher(JsonProtocolTestsAsyncClient client, SameTokenPaginationApiRequest firstRequest,
88-
boolean isLastPage) {
84+
boolean isLastPage) {
8985
this.client = client;
9086
this.firstRequest = firstRequest;
9187
this.isLastPage = isLastPage;
@@ -94,7 +90,7 @@ private SameTokenPaginationApiPublisher(JsonProtocolTestsAsyncClient client, Sam
9490
@Override
9591
public void subscribe(Subscriber<? super SameTokenPaginationApiResponse> subscriber) {
9692
subscriber.onSubscribe(ResponsesSubscription.builder().subscriber(subscriber)
97-
.nextPageFetcher(new SameTokenPaginationApiResponseFetcher()).build());
93+
.nextPageFetcher(new SameTokenPaginationApiResponseFetcher()).build());
9894
}
9995

10096
/**
@@ -103,14 +99,7 @@ public void subscribe(Subscriber<? super SameTokenPaginationApiResponse> subscri
10399
* and then applies that consumer to each response returned by the service.
104100
*/
105101
public final SdkPublisher<SimpleStruct> items() {
106-
Function<SameTokenPaginationApiResponse, Iterator<SimpleStruct>> getIterator = response -> {
107-
if (response != null && response.items() != null) {
108-
return response.items().iterator();
109-
}
110-
return Collections.emptyIterator();
111-
};
112-
return PaginatedItemsPublisher.builder().nextPageFetcher(new SameTokenPaginationApiResponseFetcher())
113-
.iteratorFunction(getIterator).isLastPage(isLastPage).build();
102+
return this.flatMapIterable(response -> response.items());
114103
}
115104

116105
private class SameTokenPaginationApiResponseFetcher implements AsyncPageFetcher<SameTokenPaginationApiResponse> {

core/sdk-core/src/main/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisher.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
* @param <ItemT> The type of paginated member in a response page
3131
*/
3232
@SdkProtectedApi
33+
@Deprecated
3334
public final class PaginatedItemsPublisher<ResponseT, ItemT> implements SdkPublisher<ItemT> {
3435

3536
private final AsyncPageFetcher<ResponseT> nextPageFetcher;

core/sdk-core/src/test/java/utils/SdkSubscriberTest.java

Lines changed: 44 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,22 @@
1515

1616
package utils;
1717

18+
import io.reactivex.Flowable;
1819
import org.junit.Before;
1920
import org.junit.Test;
2021
import org.junit.runner.RunWith;
2122
import org.mockito.Mock;
2223
import org.mockito.Mockito;
2324
import org.mockito.junit.MockitoJUnitRunner;
2425
import org.reactivestreams.Subscriber;
25-
import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher;
26-
import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher;
26+
import software.amazon.awssdk.core.async.SdkPublisher;
2727
import software.amazon.awssdk.utils.async.LimitingSubscriber;
2828
import software.amazon.awssdk.utils.internal.async.EmptySubscription;
2929

3030
import java.util.ArrayList;
3131
import java.util.Arrays;
3232
import java.util.Iterator;
3333
import java.util.List;
34-
import java.util.concurrent.CompletableFuture;
3534
import java.util.concurrent.ExecutionException;
3635
import java.util.concurrent.ExecutorService;
3736
import java.util.concurrent.Executors;
@@ -48,96 +47,83 @@ public class SdkSubscriberTest {
4847

4948
public static final Function<Integer, Iterator<Integer>> SAMPLE_ITERATOR = response -> Arrays.asList(1, 2, 3, 4, 5, 6).listIterator();
5049
public static final Function<Integer, Iterator<Integer>> EMPTY_ITERATOR = response -> new ArrayList<Integer>().listIterator();
51-
@Mock
52-
AsyncPageFetcher asyncPageFetcher;
53-
PaginatedItemsPublisher<Integer, Integer> itemsPublisher;
5450

5551
@Mock
5652
Subscriber<Integer> mockSubscriber;
5753

54+
private SdkPublisher<Integer> sdkPublisher;
55+
5856
@Before
5957
public void setUp() {
60-
doReturn(CompletableFuture.completedFuture(1))
61-
.when(asyncPageFetcher).nextPage(null);
62-
doReturn(false)
63-
.when(asyncPageFetcher).hasNextPage(any());
58+
sdkPublisher = SdkPublisher.adapt(Flowable.just(1, 2, 3, 4, 5, 6));
6459
}
6560

6661
@Test
6762
public void limitingSubscriber_with_different_limits() throws InterruptedException, ExecutionException, TimeoutException {
68-
itemsPublisher = PaginatedItemsPublisher.builder().nextPageFetcher(asyncPageFetcher)
69-
.iteratorFunction(SAMPLE_ITERATOR).isLastPage(false).build();
63+
List<Integer> belowLimit = new ArrayList<>();
64+
sdkPublisher.limit(3).subscribe(belowLimit::add).get(5, TimeUnit.SECONDS);
65+
assertThat(belowLimit).containsExactly(1, 2, 3);
7066

71-
final List<Integer> belowLimit = new ArrayList<>();
72-
itemsPublisher.limit(3).subscribe(e -> belowLimit.add(e)).get(5, TimeUnit.SECONDS);
73-
assertThat(belowLimit).isEqualTo(Arrays.asList(1, 2, 3));
67+
List<Integer> beyondLimit = new ArrayList<>();
68+
sdkPublisher.limit(33).subscribe(beyondLimit::add).get(5, TimeUnit.SECONDS);
69+
assertThat(beyondLimit).containsExactly(1, 2, 3, 4, 5, 6);
7470

75-
final List<Integer> beyondLimit = new ArrayList<>();
76-
itemsPublisher.limit(33).subscribe(e -> beyondLimit.add(e)).get(5, TimeUnit.SECONDS);
77-
assertThat(beyondLimit).isEqualTo(Arrays.asList(1, 2, 3, 4, 5, 6));
78-
79-
final List<Integer> zeroLimit = new ArrayList<>();
80-
itemsPublisher.limit(0).subscribe(e -> zeroLimit.add(e)).get(5, TimeUnit.SECONDS);
81-
assertThat(zeroLimit).isEqualTo(Arrays.asList());
71+
List<Integer> zeroLimit = new ArrayList<>();
72+
sdkPublisher.limit(0).subscribe(zeroLimit::add).get(5, TimeUnit.SECONDS);
73+
assertThat(zeroLimit).isEmpty();
8274
}
8375

8476
@Test
8577
public void filteringSubscriber_with_different_filters() throws InterruptedException, ExecutionException, TimeoutException {
86-
itemsPublisher = PaginatedItemsPublisher.builder().nextPageFetcher(asyncPageFetcher)
87-
.iteratorFunction(SAMPLE_ITERATOR).isLastPage(false).build();
88-
89-
final List<Integer> filteredSomeList = new ArrayList<>();
90-
itemsPublisher.filter(i -> i % 2 == 0).subscribe(e -> filteredSomeList.add(e)).get(5, TimeUnit.SECONDS);
91-
assertThat(filteredSomeList).isEqualTo(Arrays.asList(2, 4, 6));
9278

93-
final List<Integer> filteredAllList = new ArrayList<>();
94-
itemsPublisher.filter(i -> i % 10 == 0).subscribe(e -> filteredAllList.add(e)).get(5, TimeUnit.SECONDS);
95-
assertThat(filteredAllList).isEqualTo(Arrays.asList());
79+
List<Integer> filteredSomeList = new ArrayList<>();
80+
sdkPublisher.filter(i -> i % 2 == 0).subscribe(filteredSomeList::add).get(5, TimeUnit.SECONDS);
81+
assertThat(filteredSomeList).containsExactly(2, 4, 6);
9682

97-
final List<Integer> filteredNone = new ArrayList<>();
98-
itemsPublisher.filter(i -> i % 1 == 0).subscribe(e -> filteredNone.add(e)).get(5, TimeUnit.SECONDS);
99-
assertThat(filteredNone).isEqualTo(Arrays.asList(1, 2, 3, 4, 5, 6));
83+
List<Integer> filteredAllList = new ArrayList<>();
84+
sdkPublisher.filter(i -> i % 10 == 0).subscribe(filteredAllList::add).get(5, TimeUnit.SECONDS);
85+
assertThat(filteredAllList).isEmpty();
10086

87+
List<Integer> filteredNone = new ArrayList<>();
88+
sdkPublisher.filter(i -> i % 1 == 0).subscribe(filteredNone::add).get(5, TimeUnit.SECONDS);
89+
assertThat(filteredNone).containsExactly(1, 2, 3, 4, 5, 6);
10190
}
10291

10392
@Test
10493
public void limit_and_filter_subscriber_chained_with_different_conditions() throws InterruptedException, ExecutionException, TimeoutException {
105-
itemsPublisher = PaginatedItemsPublisher.builder().nextPageFetcher(asyncPageFetcher)
106-
.iteratorFunction(SAMPLE_ITERATOR).isLastPage(false).build();
10794

108-
final List<Integer> belowLimitWithFiltering = new ArrayList<>();
109-
itemsPublisher.limit(4).filter(i -> i % 2 == 0).subscribe(e -> belowLimitWithFiltering.add(e)).get(5, TimeUnit.SECONDS);
110-
assertThat(belowLimitWithFiltering).isEqualTo(Arrays.asList(2, 4));
95+
List<Integer> belowLimitWithFiltering = new ArrayList<>();
96+
sdkPublisher.limit(4).filter(i -> i % 2 == 0).subscribe(belowLimitWithFiltering::add).get(5, TimeUnit.SECONDS);
97+
assertThat(belowLimitWithFiltering).containsExactly(2, 4);
11198

112-
final List<Integer> beyondLimitWithAllFiltering = new ArrayList<>();
113-
itemsPublisher.limit(33).filter(i -> i % 10 == 0).subscribe(e -> beyondLimitWithAllFiltering.add(e)).get(5, TimeUnit.SECONDS);
114-
assertThat(beyondLimitWithAllFiltering).isEqualTo(Arrays.asList());
99+
List<Integer> beyondLimitWithAllFiltering = new ArrayList<>();
100+
sdkPublisher.limit(33).filter(i -> i % 10 == 0).subscribe(beyondLimitWithAllFiltering::add).get(5, TimeUnit.SECONDS);
101+
assertThat(beyondLimitWithAllFiltering).isEmpty();
115102

116-
final List<Integer> zeroLimitAndNoFilter = new ArrayList<>();
117-
itemsPublisher.limit(0).filter(i -> i % 1 == 0).subscribe(e -> zeroLimitAndNoFilter.add(e)).get(5, TimeUnit.SECONDS);
118-
assertThat(zeroLimitAndNoFilter).isEqualTo(Arrays.asList());
103+
List<Integer> zeroLimitAndNoFilter = new ArrayList<>();
104+
sdkPublisher.limit(0).filter(i -> i % 1 == 0).subscribe(zeroLimitAndNoFilter::add).get(5, TimeUnit.SECONDS);
105+
assertThat(zeroLimitAndNoFilter).isEmpty();
119106

120-
final List<Integer> filteringbelowLimitWith = new ArrayList<>();
121-
itemsPublisher.filter(i -> i % 2 == 0).limit(2).subscribe(e -> filteringbelowLimitWith.add(e)).get(5, TimeUnit.SECONDS);
122-
assertThat(filteringbelowLimitWith).isEqualTo(Arrays.asList(2, 4));
107+
List<Integer> filteringbelowLimitWith = new ArrayList<>();
108+
sdkPublisher.filter(i -> i % 2 == 0).limit(2).subscribe(filteringbelowLimitWith::add).get(5, TimeUnit.SECONDS);
109+
assertThat(filteringbelowLimitWith).containsExactly(2, 4);
123110

124-
final List<Integer> filteringAndOutsideLimit = new ArrayList<>();
125-
itemsPublisher.filter(i -> i % 10 == 0).limit(33).subscribe(e -> filteringAndOutsideLimit.add(e)).get(5, TimeUnit.SECONDS);
126-
assertThat(filteringAndOutsideLimit).isEqualTo(Arrays.asList());
111+
List<Integer> filteringAndOutsideLimit = new ArrayList<>();
112+
sdkPublisher.filter(i -> i % 10 == 0).limit(33).subscribe(filteringAndOutsideLimit::add).get(5, TimeUnit.SECONDS);
113+
assertThat(filteringAndOutsideLimit).isEmpty();
127114
}
128115

129116
@Test
130117
public void limit__subscriber_with_empty_input_and_zero_limit() throws Exception {
131-
itemsPublisher = PaginatedItemsPublisher.builder().nextPageFetcher(asyncPageFetcher)
132-
.iteratorFunction(EMPTY_ITERATOR).isLastPage(false).build();
118+
sdkPublisher = SdkPublisher.adapt(Flowable.empty());
133119

134-
final List<Integer> zeroLimit = new ArrayList<>();
135-
itemsPublisher.limit(0).subscribe(e -> zeroLimit.add(e)).get(5, TimeUnit.SECONDS);
136-
assertThat(zeroLimit).isEqualTo(Arrays.asList());
120+
List<Integer> zeroLimit = new ArrayList<>();
121+
sdkPublisher.limit(0).subscribe(zeroLimit::add).get(5, TimeUnit.SECONDS);
122+
assertThat(zeroLimit).isEmpty();
137123

138124
List<Integer> nonZeroLimit = new ArrayList<>();
139-
itemsPublisher.limit(10).subscribe(e -> nonZeroLimit.add(e)).get(5, TimeUnit.SECONDS);
140-
assertThat(zeroLimit).isEqualTo(Arrays.asList());
125+
sdkPublisher.limit(10).subscribe(nonZeroLimit::add).get(5, TimeUnit.SECONDS);
126+
assertThat(zeroLimit).isEmpty();
141127
}
142128

143129

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"pagination": {
3+
"ListStrings": {
4+
"input_token": "NextToken",
5+
"limit_key": "MaxResults",
6+
"output_token": "NextToken",
7+
"result_key": "Strings"
8+
}
9+
}
10+
}

0 commit comments

Comments
 (0)