Skip to content

Commit 2caf026

Browse files
feat(gax-grpc): add configurable resize delta and warning for repeated resizing (#12838)
Co-authored-by: cloud-java-bot <cloud-java-bot@google.com>
1 parent 338a7ad commit 2caf026

File tree

3 files changed

+236
-13
lines changed

3 files changed

+236
-13
lines changed

sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@
6969
* <p>Package-private for internal use.
7070
*/
7171
class ChannelPool extends ManagedChannel {
72+
static final String CHANNEL_POOL_CONSECUTIVE_RESIZING_WARNING =
73+
"Channel pool is repeatedly resizing. "
74+
+ "Consider adjusting `initialChannelCount` or `maxResizeDelta` to a more reasonable value. "
75+
+ "See https://docs.cloud.google.com/java/docs/troubleshooting to enable logging "
76+
+ "and set `com.google.api.gax.grpc.ChannelPool.level=FINEST` to log the channel pool resize behavior.";
7277
@VisibleForTesting static final Logger LOG = Logger.getLogger(ChannelPool.class.getName());
7378
private static final java.time.Duration REFRESH_PERIOD = java.time.Duration.ofMinutes(50);
7479

@@ -84,6 +89,16 @@ class ChannelPool extends ManagedChannel {
8489
private final AtomicInteger indexTicker = new AtomicInteger();
8590
private final String authority;
8691

92+
// The number of consecutive resize cycles to wait before logging a warning about repeated
93+
// resizing. This value was chosen to detect repeated requests for changes (multiple continuous
94+
// increase or decrease attempts) without being too sensitive.
95+
private static final int CONSECUTIVE_RESIZE_THRESHOLD = 5;
96+
97+
// Tracks the number of consecutive resize cycles where a resize actually occurred (either expand
98+
// or shrink). Used to detect repeated resizing activity and log a warning.
99+
// Note: This field is only accessed safely within resizeSafely() and does not need to be atomic.
100+
private int consecutiveResizes = 0;
101+
87102
static ChannelPool create(
88103
ChannelPoolSettings settings,
89104
ChannelFactory channelFactory,
@@ -275,7 +290,8 @@ private void resizeSafely() {
275290
* <li>Get the maximum number of outstanding RPCs since last invocation
276291
* <li>Determine a valid range of number of channels to handle that many outstanding RPCs
277292
* <li>If the current number of channel falls outside of that range, add or remove at most
278-
* {@link ChannelPoolSettings#MAX_RESIZE_DELTA} to get closer to middle of that range.
293+
* {@link ChannelPoolSettings#DEFAULT_MAX_RESIZE_DELTA} to get closer to middle of that
294+
* range.
279295
* </ul>
280296
*
281297
* <p>Not threadsafe, must be called under the entryWriteLock monitor
@@ -313,9 +329,25 @@ void resize() {
313329
int currentSize = localEntries.size();
314330
int delta = tentativeTarget - currentSize;
315331
int dampenedTarget = tentativeTarget;
316-
if (Math.abs(delta) > ChannelPoolSettings.MAX_RESIZE_DELTA) {
317-
dampenedTarget =
318-
currentSize + (int) Math.copySign(ChannelPoolSettings.MAX_RESIZE_DELTA, delta);
332+
if (Math.abs(delta) > settings.getMaxResizeDelta()) {
333+
dampenedTarget = currentSize + (int) Math.copySign(settings.getMaxResizeDelta(), delta);
334+
}
335+
336+
// Only count as "resized" if the thresholds are crossed and Gax attempts to scale. Checking
337+
// that `dampenedTarget != currentSize` would cause false positives when the pool is within
338+
// bounds but not at the target (target aims for the middle of the bounds)
339+
boolean resized = (currentSize < minChannels || currentSize > maxChannels);
340+
if (resized) {
341+
consecutiveResizes++;
342+
} else {
343+
consecutiveResizes = 0;
344+
}
345+
346+
// Log warning only once when the consecutive threshold is reached to avoid spamming logs. Log
347+
// message will repeat if the number of consecutive resizes resets (e.g. stabilizes for a bit).
348+
// However, aim to log once to ensure that this does not incur log spam.
349+
if (consecutiveResizes == CONSECUTIVE_RESIZE_THRESHOLD) {
350+
LOG.warning(CHANNEL_POOL_CONSECUTIVE_RESIZING_WARNING);
319351
}
320352

321353
// Only resize the pool when thresholds are crossed

sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,11 @@ public abstract class ChannelPoolSettings {
5959
static final Duration RESIZE_INTERVAL = Duration.ofMinutes(1);
6060

6161
/** The maximum number of channels that can be added or removed at a time. */
62-
static final int MAX_RESIZE_DELTA = 2;
62+
static final int DEFAULT_MAX_RESIZE_DELTA = 2;
63+
64+
// Arbitrary limit to prevent unbounded growth and protect server/client resources.
65+
// Capping at 25 ensures we don't scale too aggressively in a single cycle.
66+
private static final int MAX_ALLOWED_RESIZE_DELTA = 25;
6367

6468
/**
6569
* Threshold to start scaling down the channel pool.
@@ -92,6 +96,22 @@ public abstract class ChannelPoolSettings {
9296
*/
9397
public abstract int getMaxChannelCount();
9498

99+
/**
100+
* The maximum number of channels that can be added or removed at a time.
101+
*
102+
* <p>This setting limits the rate at which the channel pool can grow or shrink in a single resize
103+
* period. The default value is {@value #DEFAULT_MAX_RESIZE_DELTA}. Increasing this value can help
104+
* the pool better handle sudden bursts or spikes in requests by allowing it to scale up faster.
105+
* Regardless of this setting, the number of channels will never exceed {@link
106+
* #getMaxChannelCount()}.
107+
*
108+
* <p><b>Note:</b> This value cannot exceed {@value #MAX_ALLOWED_RESIZE_DELTA}.
109+
*
110+
* <p><b>Warning:</b> Higher values for resize delta may still result in performance degradation
111+
* during spikes due to rapid scaling.
112+
*/
113+
public abstract int getMaxResizeDelta();
114+
95115
/**
96116
* The initial size of the channel pool.
97117
*
@@ -116,11 +136,7 @@ boolean isStaticSize() {
116136
return true;
117137
}
118138
// When the scaling threshold are not set
119-
if (getMinRpcsPerChannel() == 0 && getMaxRpcsPerChannel() == Integer.MAX_VALUE) {
120-
return true;
121-
}
122-
123-
return false;
139+
return getMinRpcsPerChannel() == 0 && getMaxRpcsPerChannel() == Integer.MAX_VALUE;
124140
}
125141

126142
public abstract Builder toBuilder();
@@ -132,6 +148,9 @@ public static ChannelPoolSettings staticallySized(int size) {
132148
.setMaxRpcsPerChannel(Integer.MAX_VALUE)
133149
.setMinChannelCount(size)
134150
.setMaxChannelCount(size)
151+
// Static pools don't resize so this value doesn't affect operation. However,
152+
// validation still checks that resize delta doesn't exceed channel pool size.
153+
.setMaxResizeDelta(Math.min(DEFAULT_MAX_RESIZE_DELTA, size))
135154
.build();
136155
}
137156

@@ -142,7 +161,8 @@ public static Builder builder() {
142161
.setMaxChannelCount(200)
143162
.setMinRpcsPerChannel(0)
144163
.setMaxRpcsPerChannel(Integer.MAX_VALUE)
145-
.setPreemptiveRefreshEnabled(false);
164+
.setPreemptiveRefreshEnabled(false)
165+
.setMaxResizeDelta(DEFAULT_MAX_RESIZE_DELTA);
146166
}
147167

148168
@AutoValue.Builder
@@ -159,6 +179,15 @@ public abstract static class Builder {
159179

160180
public abstract Builder setPreemptiveRefreshEnabled(boolean enabled);
161181

182+
/**
183+
* Sets the maximum number of channels that can be added or removed in a single resize cycle.
184+
* This acts as a rate limiter to prevent wild fluctuations.
185+
*
186+
* <p><b>Warning:</b> Higher values for resize delta may still result in performance degradation
187+
* during spikes due to rapid scaling.
188+
*/
189+
public abstract Builder setMaxResizeDelta(int count);
190+
162191
abstract ChannelPoolSettings autoBuild();
163192

164193
public ChannelPoolSettings build() {
@@ -178,6 +207,14 @@ public ChannelPoolSettings build() {
178207
"initial channel count must be less than maxChannelCount");
179208
Preconditions.checkState(
180209
s.getInitialChannelCount() > 0, "Initial channel count must be greater than 0");
210+
Preconditions.checkState(
211+
s.getMaxResizeDelta() > 0, "Max resize delta must be greater than 0");
212+
Preconditions.checkState(
213+
s.getMaxResizeDelta() <= MAX_ALLOWED_RESIZE_DELTA,
214+
"Max resize delta cannot be greater than " + MAX_ALLOWED_RESIZE_DELTA);
215+
Preconditions.checkState(
216+
s.getMaxResizeDelta() <= s.getMaxChannelCount(),
217+
"Max resize delta cannot be greater than max channel count");
181218
return s;
182219
}
183220
}

sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java

Lines changed: 156 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import static com.google.api.gax.grpc.testing.FakeServiceGrpc.METHOD_RECOGNIZE;
3333
import static com.google.api.gax.grpc.testing.FakeServiceGrpc.METHOD_SERVER_STREAMING_RECOGNIZE;
3434
import static com.google.common.truth.Truth.assertThat;
35+
import static org.junit.Assert.assertThrows;
3536

3637
import com.google.api.core.ApiFuture;
3738
import com.google.api.gax.core.FixedExecutorProvider;
@@ -523,13 +524,57 @@ void channelCountShouldNotChangeWhenOutstandingRpcsAreWithinLimits() throws Exce
523524
assertThat(pool.entries.get()).hasSize(2);
524525
}
525526

527+
@Test
528+
void customResizeDeltaIsRespected() throws Exception {
529+
ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class);
530+
FixedExecutorProvider provider = FixedExecutorProvider.create(executor);
531+
532+
List<ManagedChannel> channels = new ArrayList<>();
533+
534+
ChannelFactory channelFactory =
535+
() -> {
536+
ManagedChannel channel = Mockito.mock(ManagedChannel.class);
537+
Mockito.when(channel.newCall(Mockito.any(), Mockito.any()))
538+
.thenAnswer(
539+
invocation -> {
540+
@SuppressWarnings("unchecked")
541+
ClientCall<Object, Object> clientCall = Mockito.mock(ClientCall.class);
542+
return clientCall;
543+
});
544+
545+
channels.add(channel);
546+
return channel;
547+
};
548+
549+
pool =
550+
new ChannelPool(
551+
ChannelPoolSettings.builder()
552+
.setInitialChannelCount(2)
553+
.setMinRpcsPerChannel(1)
554+
.setMaxRpcsPerChannel(2)
555+
.setMaxResizeDelta(5)
556+
.build(),
557+
channelFactory,
558+
provider);
559+
assertThat(pool.entries.get()).hasSize(2);
560+
561+
// Add 20 RPCs to push expansion
562+
for (int i = 0; i < 20; i++) {
563+
ClientCalls.futureUnaryCall(
564+
pool.newCall(METHOD_RECOGNIZE, CallOptions.DEFAULT), Color.getDefaultInstance());
565+
}
566+
pool.resize();
567+
// delta is 15 - 2 = 13. Capped at maxResizeDelta = 5.
568+
// Expected size = 2 + 5 = 7.
569+
assertThat(pool.entries.get()).hasSize(7);
570+
}
571+
526572
@Test
527573
void removedIdleChannelsAreShutdown() throws Exception {
528574
ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class);
529575
FixedExecutorProvider provider = FixedExecutorProvider.create(executor);
530576

531577
List<ManagedChannel> channels = new ArrayList<>();
532-
List<ClientCall<Object, Object>> startedCalls = new ArrayList<>();
533578

534579
ChannelFactory channelFactory =
535580
() -> {
@@ -539,7 +584,6 @@ void removedIdleChannelsAreShutdown() throws Exception {
539584
invocation -> {
540585
@SuppressWarnings("unchecked")
541586
ClientCall<Object, Object> clientCall = Mockito.mock(ClientCall.class);
542-
startedCalls.add(clientCall);
543587
return clientCall;
544588
});
545589

@@ -679,6 +723,109 @@ public void onComplete() {}
679723
assertThat(e.getMessage()).isEqualTo("Call is already cancelled");
680724
}
681725

726+
@Test
727+
void repeatedResizingLogsWarningOnExpand() throws Exception {
728+
ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class);
729+
FixedExecutorProvider provider = FixedExecutorProvider.create(executor);
730+
731+
ChannelFactory channelFactory =
732+
() -> {
733+
ManagedChannel channel = Mockito.mock(ManagedChannel.class);
734+
Mockito.when(channel.newCall(Mockito.any(), Mockito.any()))
735+
.thenAnswer(
736+
invocation -> {
737+
@SuppressWarnings("unchecked")
738+
ClientCall<Object, Object> clientCall = Mockito.mock(ClientCall.class);
739+
return clientCall;
740+
});
741+
return channel;
742+
};
743+
744+
pool =
745+
new ChannelPool(
746+
ChannelPoolSettings.builder()
747+
.setInitialChannelCount(1)
748+
.setMinRpcsPerChannel(1)
749+
.setMaxRpcsPerChannel(2)
750+
.setMaxResizeDelta(1)
751+
.setMinChannelCount(1)
752+
.setMaxChannelCount(10)
753+
.build(),
754+
channelFactory,
755+
provider);
756+
assertThat(pool.entries.get()).hasSize(1);
757+
758+
FakeLogHandler logHandler = new FakeLogHandler();
759+
ChannelPool.LOG.addHandler(logHandler);
760+
761+
try {
762+
// Add 20 RPCs to push expansion
763+
for (int i = 0; i < 20; i++) {
764+
ClientCalls.futureUnaryCall(
765+
pool.newCall(METHOD_RECOGNIZE, CallOptions.DEFAULT), Color.getDefaultInstance());
766+
}
767+
768+
// Resize 4 times, should not log warning yet
769+
for (int i = 0; i < 4; i++) {
770+
pool.resize();
771+
}
772+
assertThat(logHandler.getAllMessages()).isEmpty();
773+
774+
// 5th resize, should log warning
775+
pool.resize();
776+
assertThat(logHandler.getAllMessages()).hasSize(1);
777+
assertThat(logHandler.getAllMessages())
778+
.contains(ChannelPool.CHANNEL_POOL_CONSECUTIVE_RESIZING_WARNING);
779+
780+
// 6th resize, should not log again
781+
pool.resize();
782+
assertThat(logHandler.getAllMessages()).hasSize(1);
783+
} finally {
784+
ChannelPool.LOG.removeHandler(logHandler);
785+
}
786+
}
787+
788+
@Test
789+
void repeatedResizingLogsWarningOnShrink() throws Exception {
790+
ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class);
791+
FixedExecutorProvider provider = FixedExecutorProvider.create(executor);
792+
793+
ChannelFactory channelFactory = () -> Mockito.mock(ManagedChannel.class);
794+
795+
pool =
796+
new ChannelPool(
797+
ChannelPoolSettings.builder()
798+
.setInitialChannelCount(10)
799+
.setMinRpcsPerChannel(1)
800+
.setMaxRpcsPerChannel(2)
801+
.setMaxResizeDelta(1)
802+
.setMinChannelCount(1)
803+
.setMaxChannelCount(10)
804+
.build(),
805+
channelFactory,
806+
provider);
807+
assertThat(pool.entries.get()).hasSize(10);
808+
809+
FakeLogHandler logHandler = new FakeLogHandler();
810+
ChannelPool.LOG.addHandler(logHandler);
811+
812+
try {
813+
// 0 RPCs, should shrink every cycle
814+
// Resize 4 times, should not log warning yet
815+
for (int i = 0; i < 4; i++) {
816+
pool.resize();
817+
}
818+
assertThat(logHandler.getAllMessages()).isEmpty();
819+
820+
// 5th resize, should log warning
821+
pool.resize();
822+
assertThat(logHandler.getAllMessages())
823+
.contains(ChannelPool.CHANNEL_POOL_CONSECUTIVE_RESIZING_WARNING);
824+
} finally {
825+
ChannelPool.LOG.removeHandler(logHandler);
826+
}
827+
}
828+
682829
@Test
683830
void testDoubleRelease() throws Exception {
684831
FakeLogHandler logHandler = new FakeLogHandler();
@@ -737,4 +884,11 @@ void testDoubleRelease() throws Exception {
737884
ChannelPool.LOG.removeHandler(logHandler);
738885
}
739886
}
887+
888+
@Test
889+
void settingsValidationFailsWhenMaxResizeDeltaExceedsLimit() {
890+
ChannelPoolSettings.Builder builder =
891+
ChannelPoolSettings.builder().setMaxResizeDelta(26).setMaxChannelCount(30);
892+
assertThrows(IllegalStateException.class, builder::build);
893+
}
740894
}

0 commit comments

Comments
 (0)