Skip to content

Commit 0e0d18f

Browse files
authored
Fix stackoverflow with large pages in paginator (#6668)
* Fix stackoverflow with large pages in paginator Previously, signalig onNext() to the subscriber was done via recursion, pulling elements from an iterator over the current page returned by the service. However, this can quickly lead to a stackoverflow error since the stack will grow linearly with the size of the page. This commit fixes this issue by using SdkPublisher's builtin flatMapIterable(), which uses a loop to signal onNext(), and also ensures that it does not call itself recursively. fixes #6411 * Reintroduce the protected API for backcompat * Review comments * Deprecated item publisher
1 parent 4634a22 commit 0e0d18f

10 files changed

Lines changed: 217 additions & 111 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Fix an issue where `StackOverflowError` can occur when iterating over large pages from an async paginator. This can manifest as the publisher hanging/never reaching the end of the stream. Fixes [#6411](https://github.com/aws/aws-sdk-java-v2/issues/6411)."
6+
}

codegen/src/main/java/software/amazon/awssdk/codegen/poet/paginators/AsyncResponseClassSpec.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,9 @@
2424
import com.squareup.javapoet.TypeSpec;
2525
import com.squareup.javapoet.WildcardTypeName;
2626
import java.util.Collections;
27-
import java.util.Iterator;
2827
import java.util.Objects;
2928
import java.util.concurrent.CompletableFuture;
3029
import java.util.function.Consumer;
31-
import java.util.function.Function;
3230
import java.util.stream.Collectors;
3331
import java.util.stream.Stream;
3432
import javax.lang.model.element.Modifier;
@@ -40,7 +38,6 @@
4038
import software.amazon.awssdk.codegen.poet.PoetUtils;
4139
import software.amazon.awssdk.core.async.SdkPublisher;
4240
import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher;
43-
import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher;
4441
import software.amazon.awssdk.core.pagination.async.ResponsesSubscription;
4542

4643
/**
@@ -200,21 +197,23 @@ private MethodSpec getMethodsSpecForSingleResultKey(String resultKey) {
200197
return null;
201198
}
202199

200+
String fluentGetter = fluentGetterMethodForResponseMember(resultKey);
201+
202+
CodeBlock.Builder iterableFnBuilder = CodeBlock.builder()
203+
.add("$1N -> $1N.$2L", RESPONSE_LITERAL, fluentGetter);
204+
205+
String fluentGetterMethodName = resultKeyModel.getFluentGetterMethodName();
206+
207+
if (resultKeyModel.isMap()) {
208+
iterableFnBuilder.add(".entrySet()");
209+
}
210+
203211
TypeName resultKeyType = getTypeForResultKey(resultKey);
204212

205-
return MethodSpec.methodBuilder(resultKeyModel.getFluentGetterMethodName())
213+
return MethodSpec.methodBuilder(fluentGetterMethodName)
206214
.addModifiers(Modifier.PUBLIC, Modifier.FINAL)
207215
.returns(ParameterizedTypeName.get(ClassName.get(SdkPublisher.class), resultKeyType))
208-
.addCode("$T getIterator = ",
209-
ParameterizedTypeName.get(ClassName.get(Function.class),
210-
responseType(),
211-
ParameterizedTypeName.get(ClassName.get(Iterator.class),
212-
resultKeyType)))
213-
.addCode(getIteratorLambdaBlock(resultKey, resultKeyModel))
214-
.addCode("\n")
215-
.addStatement("return $1T.builder().$2L(new $3L()).iteratorFunction(getIterator).$4L($4L).build()",
216-
PaginatedItemsPublisher.class, NEXT_PAGE_FETCHER_MEMBER, nextPageFetcherClassName(),
217-
LAST_PAGE_FIELD)
216+
.addStatement("return this.flatMapIterable($L)", iterableFnBuilder.build())
218217
.addJavadoc(CodeBlock.builder()
219218
.add("Returns a publisher that can be used to get a stream of data. You need to "
220219
+ "subscribe to the publisher to request the stream of data. The publisher "

codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithResultKeyAndMoreResultsPublisher.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
package software.amazon.awssdk.services.jsonprotocoltests.paginators;
22

3-
import java.util.Collections;
4-
import java.util.Iterator;
53
import java.util.concurrent.CompletableFuture;
6-
import java.util.function.Function;
74
import org.reactivestreams.Subscriber;
85
import software.amazon.awssdk.annotations.Generated;
96
import software.amazon.awssdk.core.async.SdkPublisher;
107
import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher;
11-
import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher;
128
import software.amazon.awssdk.core.pagination.async.ResponsesSubscription;
139
import software.amazon.awssdk.services.jsonprotocoltests.JsonProtocolTestsAsyncClient;
1410
import software.amazon.awssdk.services.jsonprotocoltests.internal.UserAgentUtils;
@@ -107,15 +103,7 @@ public void subscribe(Subscriber<? super PaginatedOperationWithResultKeyAndMoreR
107103
* and then applies that consumer to each response returned by the service.
108104
*/
109105
public final SdkPublisher<SimpleStruct> items() {
110-
Function<PaginatedOperationWithResultKeyAndMoreResultsResponse, Iterator<SimpleStruct>> getIterator = response -> {
111-
if (response != null && response.items() != null) {
112-
return response.items().iterator();
113-
}
114-
return Collections.emptyIterator();
115-
};
116-
return PaginatedItemsPublisher.builder()
117-
.nextPageFetcher(new PaginatedOperationWithResultKeyAndMoreResultsResponseFetcher())
118-
.iteratorFunction(getIterator).isLastPage(isLastPage).build();
106+
return this.flatMapIterable(response -> response.items());
119107
}
120108

121109
private class PaginatedOperationWithResultKeyAndMoreResultsResponseFetcher implements

codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithResultKeyPublisher.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
package software.amazon.awssdk.services.jsonprotocoltests.paginators;
22

3-
import java.util.Collections;
4-
import java.util.Iterator;
53
import java.util.concurrent.CompletableFuture;
6-
import java.util.function.Function;
74
import org.reactivestreams.Subscriber;
85
import software.amazon.awssdk.annotations.Generated;
96
import software.amazon.awssdk.core.async.SdkPublisher;
107
import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher;
11-
import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher;
128
import software.amazon.awssdk.core.pagination.async.ResponsesSubscription;
139
import software.amazon.awssdk.core.util.PaginatorUtils;
1410
import software.amazon.awssdk.services.jsonprotocoltests.JsonProtocolTestsAsyncClient;
@@ -107,14 +103,7 @@ public void subscribe(Subscriber<? super PaginatedOperationWithResultKeyResponse
107103
* and then applies that consumer to each response returned by the service.
108104
*/
109105
public final SdkPublisher<SimpleStruct> items() {
110-
Function<PaginatedOperationWithResultKeyResponse, Iterator<SimpleStruct>> getIterator = response -> {
111-
if (response != null && response.items() != null) {
112-
return response.items().iterator();
113-
}
114-
return Collections.emptyIterator();
115-
};
116-
return PaginatedItemsPublisher.builder().nextPageFetcher(new PaginatedOperationWithResultKeyResponseFetcher())
117-
.iteratorFunction(getIterator).isLastPage(isLastPage).build();
106+
return this.flatMapIterable(response -> response.items());
118107
}
119108

120109
private class PaginatedOperationWithResultKeyResponseFetcher implements

codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/customizations/SameTokenPaginationApiPublisher.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
package software.amazon.awssdk.services.jsonprotocoltests.paginators;
22

3-
import java.util.Collections;
4-
import java.util.Iterator;
53
import java.util.concurrent.CompletableFuture;
6-
import java.util.function.Function;
74
import org.reactivestreams.Subscriber;
85
import software.amazon.awssdk.annotations.Generated;
96
import software.amazon.awssdk.core.async.SdkPublisher;
107
import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher;
11-
import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher;
128
import software.amazon.awssdk.core.pagination.async.ResponsesSubscription;
139
import software.amazon.awssdk.core.util.PaginatorUtils;
1410
import software.amazon.awssdk.services.jsonprotocoltests.JsonProtocolTestsAsyncClient;
@@ -85,7 +81,7 @@ public SameTokenPaginationApiPublisher(JsonProtocolTestsAsyncClient client, Same
8581
}
8682

8783
private SameTokenPaginationApiPublisher(JsonProtocolTestsAsyncClient client, SameTokenPaginationApiRequest firstRequest,
88-
boolean isLastPage) {
84+
boolean isLastPage) {
8985
this.client = client;
9086
this.firstRequest = firstRequest;
9187
this.isLastPage = isLastPage;
@@ -94,7 +90,7 @@ private SameTokenPaginationApiPublisher(JsonProtocolTestsAsyncClient client, Sam
9490
@Override
9591
public void subscribe(Subscriber<? super SameTokenPaginationApiResponse> subscriber) {
9692
subscriber.onSubscribe(ResponsesSubscription.builder().subscriber(subscriber)
97-
.nextPageFetcher(new SameTokenPaginationApiResponseFetcher()).build());
93+
.nextPageFetcher(new SameTokenPaginationApiResponseFetcher()).build());
9894
}
9995

10096
/**
@@ -103,14 +99,7 @@ public void subscribe(Subscriber<? super SameTokenPaginationApiResponse> subscri
10399
* and then applies that consumer to each response returned by the service.
104100
*/
105101
public final SdkPublisher<SimpleStruct> items() {
106-
Function<SameTokenPaginationApiResponse, Iterator<SimpleStruct>> getIterator = response -> {
107-
if (response != null && response.items() != null) {
108-
return response.items().iterator();
109-
}
110-
return Collections.emptyIterator();
111-
};
112-
return PaginatedItemsPublisher.builder().nextPageFetcher(new SameTokenPaginationApiResponseFetcher())
113-
.iteratorFunction(getIterator).isLastPage(isLastPage).build();
102+
return this.flatMapIterable(response -> response.items());
114103
}
115104

116105
private class SameTokenPaginationApiResponseFetcher implements AsyncPageFetcher<SameTokenPaginationApiResponse> {

core/sdk-core/src/main/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisher.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
* @param <ItemT> The type of paginated member in a response page
3131
*/
3232
@SdkProtectedApi
33+
@Deprecated
3334
public final class PaginatedItemsPublisher<ResponseT, ItemT> implements SdkPublisher<ItemT> {
3435

3536
private final AsyncPageFetcher<ResponseT> nextPageFetcher;

core/sdk-core/src/test/java/utils/SdkSubscriberTest.java

Lines changed: 44 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,22 @@
1515

1616
package utils;
1717

18+
import io.reactivex.Flowable;
1819
import org.junit.Before;
1920
import org.junit.Test;
2021
import org.junit.runner.RunWith;
2122
import org.mockito.Mock;
2223
import org.mockito.Mockito;
2324
import org.mockito.junit.MockitoJUnitRunner;
2425
import org.reactivestreams.Subscriber;
25-
import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher;
26-
import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher;
26+
import software.amazon.awssdk.core.async.SdkPublisher;
2727
import software.amazon.awssdk.utils.async.LimitingSubscriber;
2828
import software.amazon.awssdk.utils.internal.async.EmptySubscription;
2929

3030
import java.util.ArrayList;
3131
import java.util.Arrays;
3232
import java.util.Iterator;
3333
import java.util.List;
34-
import java.util.concurrent.CompletableFuture;
3534
import java.util.concurrent.ExecutionException;
3635
import java.util.concurrent.ExecutorService;
3736
import java.util.concurrent.Executors;
@@ -48,96 +47,83 @@ public class SdkSubscriberTest {
4847

4948
public static final Function<Integer, Iterator<Integer>> SAMPLE_ITERATOR = response -> Arrays.asList(1, 2, 3, 4, 5, 6).listIterator();
5049
public static final Function<Integer, Iterator<Integer>> EMPTY_ITERATOR = response -> new ArrayList<Integer>().listIterator();
51-
@Mock
52-
AsyncPageFetcher asyncPageFetcher;
53-
PaginatedItemsPublisher<Integer, Integer> itemsPublisher;
5450

5551
@Mock
5652
Subscriber<Integer> mockSubscriber;
5753

54+
private SdkPublisher<Integer> sdkPublisher;
55+
5856
@Before
5957
public void setUp() {
60-
doReturn(CompletableFuture.completedFuture(1))
61-
.when(asyncPageFetcher).nextPage(null);
62-
doReturn(false)
63-
.when(asyncPageFetcher).hasNextPage(any());
58+
sdkPublisher = SdkPublisher.adapt(Flowable.just(1, 2, 3, 4, 5, 6));
6459
}
6560

6661
@Test
6762
public void limitingSubscriber_with_different_limits() throws InterruptedException, ExecutionException, TimeoutException {
68-
itemsPublisher = PaginatedItemsPublisher.builder().nextPageFetcher(asyncPageFetcher)
69-
.iteratorFunction(SAMPLE_ITERATOR).isLastPage(false).build();
63+
List<Integer> belowLimit = new ArrayList<>();
64+
sdkPublisher.limit(3).subscribe(belowLimit::add).get(5, TimeUnit.SECONDS);
65+
assertThat(belowLimit).containsExactly(1, 2, 3);
7066

71-
final List<Integer> belowLimit = new ArrayList<>();
72-
itemsPublisher.limit(3).subscribe(e -> belowLimit.add(e)).get(5, TimeUnit.SECONDS);
73-
assertThat(belowLimit).isEqualTo(Arrays.asList(1, 2, 3));
67+
List<Integer> beyondLimit = new ArrayList<>();
68+
sdkPublisher.limit(33).subscribe(beyondLimit::add).get(5, TimeUnit.SECONDS);
69+
assertThat(beyondLimit).containsExactly(1, 2, 3, 4, 5, 6);
7470

75-
final List<Integer> beyondLimit = new ArrayList<>();
76-
itemsPublisher.limit(33).subscribe(e -> beyondLimit.add(e)).get(5, TimeUnit.SECONDS);
77-
assertThat(beyondLimit).isEqualTo(Arrays.asList(1, 2, 3, 4, 5, 6));
78-
79-
final List<Integer> zeroLimit = new ArrayList<>();
80-
itemsPublisher.limit(0).subscribe(e -> zeroLimit.add(e)).get(5, TimeUnit.SECONDS);
81-
assertThat(zeroLimit).isEqualTo(Arrays.asList());
71+
List<Integer> zeroLimit = new ArrayList<>();
72+
sdkPublisher.limit(0).subscribe(zeroLimit::add).get(5, TimeUnit.SECONDS);
73+
assertThat(zeroLimit).isEmpty();
8274
}
8375

8476
@Test
8577
public void filteringSubscriber_with_different_filters() throws InterruptedException, ExecutionException, TimeoutException {
86-
itemsPublisher = PaginatedItemsPublisher.builder().nextPageFetcher(asyncPageFetcher)
87-
.iteratorFunction(SAMPLE_ITERATOR).isLastPage(false).build();
88-
89-
final List<Integer> filteredSomeList = new ArrayList<>();
90-
itemsPublisher.filter(i -> i % 2 == 0).subscribe(e -> filteredSomeList.add(e)).get(5, TimeUnit.SECONDS);
91-
assertThat(filteredSomeList).isEqualTo(Arrays.asList(2, 4, 6));
9278

93-
final List<Integer> filteredAllList = new ArrayList<>();
94-
itemsPublisher.filter(i -> i % 10 == 0).subscribe(e -> filteredAllList.add(e)).get(5, TimeUnit.SECONDS);
95-
assertThat(filteredAllList).isEqualTo(Arrays.asList());
79+
List<Integer> filteredSomeList = new ArrayList<>();
80+
sdkPublisher.filter(i -> i % 2 == 0).subscribe(filteredSomeList::add).get(5, TimeUnit.SECONDS);
81+
assertThat(filteredSomeList).containsExactly(2, 4, 6);
9682

97-
final List<Integer> filteredNone = new ArrayList<>();
98-
itemsPublisher.filter(i -> i % 1 == 0).subscribe(e -> filteredNone.add(e)).get(5, TimeUnit.SECONDS);
99-
assertThat(filteredNone).isEqualTo(Arrays.asList(1, 2, 3, 4, 5, 6));
83+
List<Integer> filteredAllList = new ArrayList<>();
84+
sdkPublisher.filter(i -> i % 10 == 0).subscribe(filteredAllList::add).get(5, TimeUnit.SECONDS);
85+
assertThat(filteredAllList).isEmpty();
10086

87+
List<Integer> filteredNone = new ArrayList<>();
88+
sdkPublisher.filter(i -> i % 1 == 0).subscribe(filteredNone::add).get(5, TimeUnit.SECONDS);
89+
assertThat(filteredNone).containsExactly(1, 2, 3, 4, 5, 6);
10190
}
10291

10392
@Test
10493
public void limit_and_filter_subscriber_chained_with_different_conditions() throws InterruptedException, ExecutionException, TimeoutException {
105-
itemsPublisher = PaginatedItemsPublisher.builder().nextPageFetcher(asyncPageFetcher)
106-
.iteratorFunction(SAMPLE_ITERATOR).isLastPage(false).build();
10794

108-
final List<Integer> belowLimitWithFiltering = new ArrayList<>();
109-
itemsPublisher.limit(4).filter(i -> i % 2 == 0).subscribe(e -> belowLimitWithFiltering.add(e)).get(5, TimeUnit.SECONDS);
110-
assertThat(belowLimitWithFiltering).isEqualTo(Arrays.asList(2, 4));
95+
List<Integer> belowLimitWithFiltering = new ArrayList<>();
96+
sdkPublisher.limit(4).filter(i -> i % 2 == 0).subscribe(belowLimitWithFiltering::add).get(5, TimeUnit.SECONDS);
97+
assertThat(belowLimitWithFiltering).containsExactly(2, 4);
11198

112-
final List<Integer> beyondLimitWithAllFiltering = new ArrayList<>();
113-
itemsPublisher.limit(33).filter(i -> i % 10 == 0).subscribe(e -> beyondLimitWithAllFiltering.add(e)).get(5, TimeUnit.SECONDS);
114-
assertThat(beyondLimitWithAllFiltering).isEqualTo(Arrays.asList());
99+
List<Integer> beyondLimitWithAllFiltering = new ArrayList<>();
100+
sdkPublisher.limit(33).filter(i -> i % 10 == 0).subscribe(beyondLimitWithAllFiltering::add).get(5, TimeUnit.SECONDS);
101+
assertThat(beyondLimitWithAllFiltering).isEmpty();
115102

116-
final List<Integer> zeroLimitAndNoFilter = new ArrayList<>();
117-
itemsPublisher.limit(0).filter(i -> i % 1 == 0).subscribe(e -> zeroLimitAndNoFilter.add(e)).get(5, TimeUnit.SECONDS);
118-
assertThat(zeroLimitAndNoFilter).isEqualTo(Arrays.asList());
103+
List<Integer> zeroLimitAndNoFilter = new ArrayList<>();
104+
sdkPublisher.limit(0).filter(i -> i % 1 == 0).subscribe(zeroLimitAndNoFilter::add).get(5, TimeUnit.SECONDS);
105+
assertThat(zeroLimitAndNoFilter).isEmpty();
119106

120-
final List<Integer> filteringbelowLimitWith = new ArrayList<>();
121-
itemsPublisher.filter(i -> i % 2 == 0).limit(2).subscribe(e -> filteringbelowLimitWith.add(e)).get(5, TimeUnit.SECONDS);
122-
assertThat(filteringbelowLimitWith).isEqualTo(Arrays.asList(2, 4));
107+
List<Integer> filteringbelowLimitWith = new ArrayList<>();
108+
sdkPublisher.filter(i -> i % 2 == 0).limit(2).subscribe(filteringbelowLimitWith::add).get(5, TimeUnit.SECONDS);
109+
assertThat(filteringbelowLimitWith).containsExactly(2, 4);
123110

124-
final List<Integer> filteringAndOutsideLimit = new ArrayList<>();
125-
itemsPublisher.filter(i -> i % 10 == 0).limit(33).subscribe(e -> filteringAndOutsideLimit.add(e)).get(5, TimeUnit.SECONDS);
126-
assertThat(filteringAndOutsideLimit).isEqualTo(Arrays.asList());
111+
List<Integer> filteringAndOutsideLimit = new ArrayList<>();
112+
sdkPublisher.filter(i -> i % 10 == 0).limit(33).subscribe(filteringAndOutsideLimit::add).get(5, TimeUnit.SECONDS);
113+
assertThat(filteringAndOutsideLimit).isEmpty();
127114
}
128115

129116
@Test
130117
public void limit__subscriber_with_empty_input_and_zero_limit() throws Exception {
131-
itemsPublisher = PaginatedItemsPublisher.builder().nextPageFetcher(asyncPageFetcher)
132-
.iteratorFunction(EMPTY_ITERATOR).isLastPage(false).build();
118+
sdkPublisher = SdkPublisher.adapt(Flowable.empty());
133119

134-
final List<Integer> zeroLimit = new ArrayList<>();
135-
itemsPublisher.limit(0).subscribe(e -> zeroLimit.add(e)).get(5, TimeUnit.SECONDS);
136-
assertThat(zeroLimit).isEqualTo(Arrays.asList());
120+
List<Integer> zeroLimit = new ArrayList<>();
121+
sdkPublisher.limit(0).subscribe(zeroLimit::add).get(5, TimeUnit.SECONDS);
122+
assertThat(zeroLimit).isEmpty();
137123

138124
List<Integer> nonZeroLimit = new ArrayList<>();
139-
itemsPublisher.limit(10).subscribe(e -> nonZeroLimit.add(e)).get(5, TimeUnit.SECONDS);
140-
assertThat(zeroLimit).isEqualTo(Arrays.asList());
125+
sdkPublisher.limit(10).subscribe(nonZeroLimit::add).get(5, TimeUnit.SECONDS);
126+
assertThat(zeroLimit).isEmpty();
141127
}
142128

143129

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"pagination": {
3+
"ListStrings": {
4+
"input_token": "NextToken",
5+
"limit_key": "MaxResults",
6+
"output_token": "NextToken",
7+
"result_key": "Strings"
8+
}
9+
}
10+
}

0 commit comments

Comments
 (0)