Skip to content

Commit 368a927

Browse files
committed
feat(gax-grpc): add configurable resize delta and warning for repeated resizing
1 parent 8095342 commit 368a927

4 files changed

Lines changed: 202 additions & 9 deletions

File tree

sdk-platform-java/gax-java/gax-grpc/pom.xml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,6 @@
142142
<plugin>
143143
<groupId>org.apache.maven.plugins</groupId>
144144
<artifactId>maven-surefire-plugin</artifactId>
145-
<configuration>
146-
<!-- These tests require an Env Var to be set. Use -PenvVarTest to ONLY run these tests -->
147-
<test>!InstantiatingGrpcChannelProviderTest#testLogDirectPathMisconfig_AttemptDirectPathNotSetAndAttemptDirectPathXdsSetViaEnv_warns,!InstantiatingGrpcChannelProviderTest#canUseDirectPath_directPathEnvVarNotSet_attemptDirectPathIsTrue,InstantiatingGrpcChannelProviderTest#testLogDirectPathMisconfigWrongCredential</test>
148-
<!-- <test>!InstantiatingGrpcChannelProviderTest#testLogDirectPathMisconfig_AttemptDirectPathNotSetAndAttemptDirectPathXdsSetViaEnv_warns</test> -->
149-
</configuration>
150145
</plugin>
151146
</plugins>
152147
</build>

sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ class ChannelPool extends ManagedChannel {
8484
private final AtomicInteger indexTicker = new AtomicInteger();
8585
private final String authority;
8686

87+
private int consecutiveResizes = 0;
88+
8789
static ChannelPool create(
8890
ChannelPoolSettings settings,
8991
ChannelFactory channelFactory,
@@ -313,9 +315,21 @@ void resize() {
313315
int currentSize = localEntries.size();
314316
int delta = tentativeTarget - currentSize;
315317
int dampenedTarget = tentativeTarget;
316-
if (Math.abs(delta) > ChannelPoolSettings.MAX_RESIZE_DELTA) {
317-
dampenedTarget =
318-
currentSize + (int) Math.copySign(ChannelPoolSettings.MAX_RESIZE_DELTA, delta);
318+
if (Math.abs(delta) > settings.getMaxResizeDelta()) {
319+
dampenedTarget = currentSize + (int) Math.copySign(settings.getMaxResizeDelta(), delta);
320+
}
321+
322+
boolean resized = (localEntries.size() < minChannels || localEntries.size() > maxChannels);
323+
if (resized) {
324+
consecutiveResizes++;
325+
} else {
326+
consecutiveResizes = 0;
327+
}
328+
329+
if (consecutiveResizes == 5) {
330+
LOG.warning(
331+
"Channel pool is repeatedly resizing. Consider adjusting `initialChannelCount` or `maxResizeDelta` to a more reasonable value. "
332+
+ "See https://docs.cloud.google.com/java/docs/troubleshooting to enable logging and set `com.google.api.gax.grpc.ChannelPool.level=FINEST` to log the channel pool resize behavior.");
319333
}
320334

321335
// Only resize the pool when thresholds are crossed

sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,15 @@ public abstract class ChannelPoolSettings {
9292
*/
9393
public abstract int getMaxChannelCount();
9494

95+
/**
96+
* The maximum number of channels that can be added or removed at a time.
97+
*
98+
* <p>This setting limits the rate at which the channel pool can grow or shrink in a single resize
99+
* period. The default value is 2. Regardless of this setting, the number of channels will never
100+
* exceed {@link #getMaxChannelCount()}.
101+
*/
102+
public abstract int getMaxResizeDelta();
103+
95104
/**
96105
* The initial size of the channel pool.
97106
*
@@ -132,6 +141,7 @@ public static ChannelPoolSettings staticallySized(int size) {
132141
.setMaxRpcsPerChannel(Integer.MAX_VALUE)
133142
.setMinChannelCount(size)
134143
.setMaxChannelCount(size)
144+
.setMaxResizeDelta(Math.min(2, size))
135145
.build();
136146
}
137147

@@ -142,7 +152,8 @@ public static Builder builder() {
142152
.setMaxChannelCount(200)
143153
.setMinRpcsPerChannel(0)
144154
.setMaxRpcsPerChannel(Integer.MAX_VALUE)
145-
.setPreemptiveRefreshEnabled(false);
155+
.setPreemptiveRefreshEnabled(false)
156+
.setMaxResizeDelta(2);
146157
}
147158

148159
@AutoValue.Builder
@@ -159,6 +170,8 @@ public abstract static class Builder {
159170

160171
public abstract Builder setPreemptiveRefreshEnabled(boolean enabled);
161172

173+
public abstract Builder setMaxResizeDelta(int count);
174+
162175
abstract ChannelPoolSettings autoBuild();
163176

164177
public ChannelPoolSettings build() {
@@ -178,6 +191,11 @@ public ChannelPoolSettings build() {
178191
"initial channel count must be less than maxChannelCount");
179192
Preconditions.checkState(
180193
s.getInitialChannelCount() > 0, "Initial channel count must be greater than 0");
194+
Preconditions.checkState(
195+
s.getMaxResizeDelta() > 0, "Max resize delta must be greater than 0");
196+
Preconditions.checkState(
197+
s.getMaxResizeDelta() <= s.getMaxChannelCount(),
198+
"Max resize delta cannot be greater than max channel count");
181199
return s;
182200
}
183201
}

sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,53 @@ void channelCountShouldNotChangeWhenOutstandingRpcsAreWithinLimits() throws Exce
523523
assertThat(pool.entries.get()).hasSize(2);
524524
}
525525

526+
@Test
527+
void customResizeDeltaIsRespected() throws Exception {
528+
ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class);
529+
FixedExecutorProvider provider = FixedExecutorProvider.create(executor);
530+
531+
List<ManagedChannel> channels = new ArrayList<>();
532+
List<ClientCall<Object, Object>> startedCalls = new ArrayList<>();
533+
534+
ChannelFactory channelFactory =
535+
() -> {
536+
ManagedChannel channel = Mockito.mock(ManagedChannel.class);
537+
Mockito.when(channel.newCall(Mockito.any(), Mockito.any()))
538+
.thenAnswer(
539+
invocation -> {
540+
@SuppressWarnings("unchecked")
541+
ClientCall<Object, Object> clientCall = Mockito.mock(ClientCall.class);
542+
startedCalls.add(clientCall);
543+
return clientCall;
544+
});
545+
546+
channels.add(channel);
547+
return channel;
548+
};
549+
550+
pool =
551+
new ChannelPool(
552+
ChannelPoolSettings.builder()
553+
.setInitialChannelCount(2)
554+
.setMinRpcsPerChannel(1)
555+
.setMaxRpcsPerChannel(2)
556+
.setMaxResizeDelta(5)
557+
.build(),
558+
channelFactory,
559+
provider);
560+
assertThat(pool.entries.get()).hasSize(2);
561+
562+
// Add 20 RPCs to push expansion
563+
for (int i = 0; i < 20; i++) {
564+
ClientCalls.futureUnaryCall(
565+
pool.newCall(METHOD_RECOGNIZE, CallOptions.DEFAULT), Color.getDefaultInstance());
566+
}
567+
pool.resize();
568+
// delta is 15 - 2 = 13. Capped at maxResizeDelta = 5.
569+
// Expected size = 2 + 5 = 7.
570+
assertThat(pool.entries.get()).hasSize(7);
571+
}
572+
526573
@Test
527574
void removedIdleChannelsAreShutdown() throws Exception {
528575
ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class);
@@ -679,6 +726,125 @@ public void onComplete() {}
679726
assertThat(e.getMessage()).isEqualTo("Call is already cancelled");
680727
}
681728

729+
@Test
730+
void repeatedResizingLogsWarningOnExpand() throws Exception {
731+
ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class);
732+
FixedExecutorProvider provider = FixedExecutorProvider.create(executor);
733+
734+
List<ManagedChannel> channels = new ArrayList<>();
735+
List<ClientCall<Object, Object>> startedCalls = new ArrayList<>();
736+
737+
ChannelFactory channelFactory =
738+
() -> {
739+
ManagedChannel channel = Mockito.mock(ManagedChannel.class);
740+
Mockito.when(channel.newCall(Mockito.any(), Mockito.any()))
741+
.thenAnswer(
742+
invocation -> {
743+
@SuppressWarnings("unchecked")
744+
ClientCall<Object, Object> clientCall = Mockito.mock(ClientCall.class);
745+
startedCalls.add(clientCall);
746+
return clientCall;
747+
});
748+
749+
channels.add(channel);
750+
return channel;
751+
};
752+
753+
pool =
754+
new ChannelPool(
755+
ChannelPoolSettings.builder()
756+
.setInitialChannelCount(1)
757+
.setMinRpcsPerChannel(1)
758+
.setMaxRpcsPerChannel(2)
759+
.setMaxResizeDelta(1)
760+
.setMinChannelCount(1)
761+
.setMaxChannelCount(10)
762+
.build(),
763+
channelFactory,
764+
provider);
765+
assertThat(pool.entries.get()).hasSize(1);
766+
767+
FakeLogHandler logHandler = new FakeLogHandler();
768+
ChannelPool.LOG.addHandler(logHandler);
769+
770+
try {
771+
// Add 20 RPCs to push expansion
772+
for (int i = 0; i < 20; i++) {
773+
ClientCalls.futureUnaryCall(
774+
pool.newCall(METHOD_RECOGNIZE, CallOptions.DEFAULT), Color.getDefaultInstance());
775+
}
776+
777+
// Resize 4 times, should not log warning yet
778+
for (int i = 0; i < 4; i++) {
779+
pool.resize();
780+
}
781+
assertThat(logHandler.getAllMessages()).isEmpty();
782+
783+
// 5th resize, should log warning
784+
pool.resize();
785+
assertThat(logHandler.getAllMessages()).hasSize(1);
786+
assertThat(logHandler.getAllMessages())
787+
.contains(
788+
"Channel pool is repeatedly resizing. Consider adjusting `initialChannelCount` or `maxResizeDelta` to a more reasonable value. "
789+
+ "See https://docs.cloud.google.com/java/docs/troubleshooting to enable logging and set `com.google.api.gax.grpc.ChannelPool.level=FINEST` to log the channel pool resize behavior.");
790+
791+
// 6th resize, should not log again
792+
pool.resize();
793+
assertThat(logHandler.getAllMessages()).hasSize(1);
794+
} finally {
795+
ChannelPool.LOG.removeHandler(logHandler);
796+
}
797+
}
798+
799+
@Test
800+
void repeatedResizingLogsWarningOnShrink() throws Exception {
801+
ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class);
802+
FixedExecutorProvider provider = FixedExecutorProvider.create(executor);
803+
804+
List<ManagedChannel> channels = new ArrayList<>();
805+
ChannelFactory channelFactory =
806+
() -> {
807+
ManagedChannel channel = Mockito.mock(ManagedChannel.class);
808+
channels.add(channel);
809+
return channel;
810+
};
811+
812+
pool =
813+
new ChannelPool(
814+
ChannelPoolSettings.builder()
815+
.setInitialChannelCount(10)
816+
.setMinRpcsPerChannel(1)
817+
.setMaxRpcsPerChannel(2)
818+
.setMaxResizeDelta(1)
819+
.setMinChannelCount(1)
820+
.setMaxChannelCount(10)
821+
.build(),
822+
channelFactory,
823+
provider);
824+
assertThat(pool.entries.get()).hasSize(10);
825+
826+
FakeLogHandler logHandler = new FakeLogHandler();
827+
ChannelPool.LOG.addHandler(logHandler);
828+
829+
try {
830+
// 0 RPCs, should shrink every cycle
831+
// Resize 4 times, should not log warning yet
832+
for (int i = 0; i < 4; i++) {
833+
pool.resize();
834+
}
835+
assertThat(logHandler.getAllMessages()).isEmpty();
836+
837+
// 5th resize, should log warning
838+
pool.resize();
839+
assertThat(logHandler.getAllMessages())
840+
.contains(
841+
"Channel pool is repeatedly resizing. Consider adjusting `initialChannelCount` or `maxResizeDelta` to a more reasonable value. "
842+
+ "See https://docs.cloud.google.com/java/docs/troubleshooting to enable logging and set `com.google.api.gax.grpc.ChannelPool.level=FINEST` to log the channel pool resize behavior.");
843+
} finally {
844+
ChannelPool.LOG.removeHandler(logHandler);
845+
}
846+
}
847+
682848
@Test
683849
void testDoubleRelease() throws Exception {
684850
FakeLogHandler logHandler = new FakeLogHandler();

0 commit comments

Comments
 (0)