Skip to content

Commit 419720b

Browse files
committed
feat(gax): implement dynamic channel refreshing on 401 retries
1 parent 39e93fe commit 419720b

11 files changed

Lines changed: 196 additions & 19 deletions

File tree

sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ class ChannelPool extends ManagedChannel {
8282
private ScheduledFuture<?> resizeFuture = null;
8383

8484
private final Object entryWriteLock = new Object();
85+
private long lastRefreshTimeNanos = 0;
8586
@VisibleForTesting final AtomicReference<ImmutableList<Entry>> entries = new AtomicReference<>();
8687
private final AtomicInteger indexTicker = new AtomicInteger();
8788
private final String authority;
@@ -441,6 +442,13 @@ void refresh() {
441442
// - then thread2 will shut down channel that thread1 will put back into circulation (after it
442443
// replaces the list)
443444
synchronized (entryWriteLock) {
445+
long now = System.nanoTime();
446+
if (now - lastRefreshTimeNanos < TimeUnit.SECONDS.toNanos(5)) {
447+
LOG.fine("Channel pool was refreshed recently, skipping duplicate refresh");
448+
return;
449+
}
450+
lastRefreshTimeNanos = now;
451+
444452
LOG.fine("Refreshing all channels");
445453
ArrayList<Entry> newEntries = new ArrayList<>(entries.get());
446454

sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcCallContext.java

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ public final class GrpcCallContext implements ApiCallContext {
9797
private final ApiCallContextOptions options;
9898
private final EndpointContext endpointContext;
9999
private final boolean isDirectPath;
100+
@Nullable private final TransportChannel transportChannel;
100101

101102
/** Returns an empty instance with a null channel and default {@link CallOptions}. */
102103
public static GrpcCallContext createDefault() {
@@ -113,7 +114,8 @@ public static GrpcCallContext createDefault() {
113114
null,
114115
null,
115116
null,
116-
false);
117+
false,
118+
null);
117119
}
118120

119121
/** Returns an instance with the given channel and {@link CallOptions}. */
@@ -131,7 +133,8 @@ public static GrpcCallContext of(Channel channel, CallOptions callOptions) {
131133
null,
132134
null,
133135
null,
134-
false);
136+
false,
137+
null);
135138
}
136139

137140
private GrpcCallContext(
@@ -147,7 +150,8 @@ private GrpcCallContext(
147150
@Nullable RetrySettings retrySettings,
148151
@Nullable Set<StatusCode.Code> retryableCodes,
149152
@Nullable EndpointContext endpointContext,
150-
boolean isDirectPath) {
153+
boolean isDirectPath,
154+
@Nullable TransportChannel transportChannel) {
151155
this.channel = channel;
152156
this.credentials = credentials;
153157
Preconditions.checkNotNull(callOptions);
@@ -167,6 +171,7 @@ private GrpcCallContext(
167171
this.endpointContext =
168172
endpointContext == null ? EndpointContext.getDefaultInstance() : endpointContext;
169173
this.isDirectPath = isDirectPath;
174+
this.transportChannel = transportChannel;
170175
}
171176

172177
/**
@@ -208,7 +213,13 @@ public GrpcCallContext withCredentials(Credentials newCredentials) {
208213
retrySettings,
209214
retryableCodes,
210215
endpointContext,
211-
isDirectPath);
216+
isDirectPath,
217+
transportChannel);
218+
}
219+
220+
@Override
221+
public TransportChannel getTransportChannel() {
222+
return transportChannel;
212223
}
213224

214225
@Override
@@ -232,7 +243,8 @@ public GrpcCallContext withTransportChannel(TransportChannel inputChannel) {
232243
retrySettings,
233244
retryableCodes,
234245
endpointContext,
235-
transportChannel.isDirectPath());
246+
transportChannel.isDirectPath(),
247+
inputChannel);
236248
}
237249

238250
@Override
@@ -251,7 +263,8 @@ public GrpcCallContext withEndpointContext(EndpointContext endpointContext) {
251263
retrySettings,
252264
retryableCodes,
253265
endpointContext,
254-
isDirectPath);
266+
isDirectPath,
267+
transportChannel);
255268
}
256269

257270
/** This method is obsolete. Use {@link #withTimeoutDuration(java.time.Duration)} instead. */
@@ -286,7 +299,8 @@ public GrpcCallContext withTimeoutDuration(@Nullable java.time.Duration timeout)
286299
retrySettings,
287300
retryableCodes,
288301
endpointContext,
289-
isDirectPath);
302+
isDirectPath,
303+
transportChannel);
290304
}
291305

292306
/** This method is obsolete. Use {@link #getTimeoutDuration()} instead. */
@@ -335,7 +349,8 @@ public GrpcCallContext withStreamWaitTimeoutDuration(
335349
retrySettings,
336350
retryableCodes,
337351
endpointContext,
338-
isDirectPath);
352+
isDirectPath,
353+
transportChannel);
339354
}
340355

341356
/**
@@ -370,7 +385,8 @@ public GrpcCallContext withStreamIdleTimeoutDuration(
370385
retrySettings,
371386
retryableCodes,
372387
endpointContext,
373-
isDirectPath);
388+
isDirectPath,
389+
transportChannel);
374390
}
375391

376392
@BetaApi("The surface for channel affinity is not stable yet and may change in the future.")
@@ -388,7 +404,8 @@ public GrpcCallContext withChannelAffinity(@Nullable Integer affinity) {
388404
retrySettings,
389405
retryableCodes,
390406
endpointContext,
391-
isDirectPath);
407+
isDirectPath,
408+
transportChannel);
392409
}
393410

394411
@BetaApi("The surface for extra headers is not stable yet and may change in the future.")
@@ -410,7 +427,8 @@ public GrpcCallContext withExtraHeaders(Map<String, List<String>> extraHeaders)
410427
retrySettings,
411428
retryableCodes,
412429
endpointContext,
413-
isDirectPath);
430+
isDirectPath,
431+
transportChannel);
414432
}
415433

416434
@Override
@@ -433,7 +451,8 @@ public GrpcCallContext withRetrySettings(RetrySettings retrySettings) {
433451
retrySettings,
434452
retryableCodes,
435453
endpointContext,
436-
isDirectPath);
454+
isDirectPath,
455+
transportChannel);
437456
}
438457

439458
@Override
@@ -456,7 +475,8 @@ public GrpcCallContext withRetryableCodes(Set<StatusCode.Code> retryableCodes) {
456475
retrySettings,
457476
retryableCodes,
458477
endpointContext,
459-
isDirectPath);
478+
isDirectPath,
479+
transportChannel);
460480
}
461481

462482
@Override
@@ -558,7 +578,8 @@ public ApiCallContext merge(ApiCallContext inputCallContext) {
558578
newRetrySettings,
559579
newRetryableCodes,
560580
endpointContext,
561-
newIsDirectPath);
581+
newIsDirectPath,
582+
transportChannel);
562583
}
563584

564585
/** The {@link Channel} set on this context. */
@@ -641,7 +662,8 @@ public GrpcCallContext withChannel(Channel newChannel) {
641662
retrySettings,
642663
retryableCodes,
643664
endpointContext,
644-
isDirectPath);
665+
isDirectPath,
666+
transportChannel);
645667
}
646668

647669
/** Returns a new instance with the call options set to the given call options. */
@@ -659,7 +681,8 @@ public GrpcCallContext withCallOptions(CallOptions newCallOptions) {
659681
retrySettings,
660682
retryableCodes,
661683
endpointContext,
662-
isDirectPath);
684+
isDirectPath,
685+
transportChannel);
663686
}
664687

665688
public GrpcCallContext withRequestParamsDynamicHeaderOption(String requestParams) {
@@ -704,7 +727,8 @@ public <T> GrpcCallContext withOption(Key<T> key, T value) {
704727
retrySettings,
705728
retryableCodes,
706729
endpointContext,
707-
isDirectPath);
730+
isDirectPath,
731+
transportChannel);
708732
}
709733

710734
/** {@inheritDoc} */

sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcTransportChannel.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,14 @@ public Channel getChannel() {
6666
return getManagedChannel();
6767
}
6868

69+
@Override
70+
public void refresh() {
71+
Channel channel = getChannel();
72+
if (channel instanceof ChannelPool) {
73+
((ChannelPool) channel).refresh();
74+
}
75+
}
76+
6977
@Override
7078
public void shutdown() {
7179
getManagedChannel().shutdown();
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
* * Neither the name of Google LLC nor the names of its
15+
* contributors may be used to endorse or promote products derived from
16+
* this software without specific prior written permission.
17+
*
18+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
*/
30+
package com.google.api.gax.retrying;
31+
32+
import com.google.api.core.BetaApi;
33+
34+
/**
35+
* A listener that is notified in between retry attempts, allowing custom transport-level
36+
* side effects to be executed out-of-band.
37+
*
38+
* @param <ResponseT> response type
39+
*/
40+
@BetaApi("The surface for attempt listeners is not stable yet and may change in the future.")
41+
public interface AttemptListener<ResponseT> {
42+
/** Called in between retry attempts before the next attempt task is scheduled or fired. */
43+
void onAttemptComplete(RetryingFuture<ResponseT> retryingFuture);
44+
}

sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/retrying/BasicRetryingFuture.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ public TimedAttemptSettings getAttemptSettings() {
116116
}
117117
}
118118

119+
@Override
120+
public RetryingContext getRetryingContext() {
121+
return retryingContext;
122+
}
123+
119124
@Override
120125
public ApiFuture<ResponseT> peekAttemptResult() {
121126
synchronized (lock) {

sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/retrying/RetryingFuture.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,4 +128,12 @@ public interface RetryingFuture<ResponseT> extends ApiFuture<ResponseT> {
128128
* </ul>
129129
*/
130130
ApiFuture<ResponseT> getAttemptResult();
131+
132+
/**
133+
* Returns the retrying context associated with this future, or {@code null} if none is set.
134+
*/
135+
@com.google.api.core.BetaApi("The surface for passing per operation state is not yet stable")
136+
default RetryingContext getRetryingContext() {
137+
return null;
138+
}
131139
}

sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/retrying/ScheduledRetryingExecutor.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public class ScheduledRetryingExecutor<ResponseT>
5959

6060
private final RetryAlgorithm<ResponseT> retryAlgorithm;
6161
private final ListeningScheduledExecutorService scheduler;
62+
private final AttemptListener<ResponseT> attemptListener;
6263

6364
/**
6465
* Creates a new scheduled retry executor, which will be using {@code scheduler} for actual
@@ -69,8 +70,24 @@ public class ScheduledRetryingExecutor<ResponseT>
6970
*/
7071
public ScheduledRetryingExecutor(
7172
RetryAlgorithm<ResponseT> retryAlgorithm, ScheduledExecutorService scheduler) {
73+
this(retryAlgorithm, scheduler, null);
74+
}
75+
76+
/**
77+
* Creates a new scheduled retry executor with a custom {@link AttemptListener}.
78+
*
79+
* @param retryAlgorithm retry algorithm to use
80+
* @param scheduler scheduler
81+
* @param attemptListener a listener to be notified in between retry attempts
82+
*/
83+
@BetaApi("The surface for attempt listeners is not stable yet and may change in the future.")
84+
public ScheduledRetryingExecutor(
85+
RetryAlgorithm<ResponseT> retryAlgorithm,
86+
ScheduledExecutorService scheduler,
87+
AttemptListener<ResponseT> attemptListener) {
7288
this.retryAlgorithm = retryAlgorithm;
7389
this.scheduler = MoreExecutors.listeningDecorator(scheduler);
90+
this.attemptListener = attemptListener;
7491
}
7592

7693
/**
@@ -111,6 +128,10 @@ public RetryingFuture<ResponseT> createFuture(
111128
*/
112129
@Override
113130
public ApiFuture<ResponseT> submit(RetryingFuture<ResponseT> retryingFuture) {
131+
if (attemptListener != null) {
132+
attemptListener.onAttemptComplete(retryingFuture);
133+
}
134+
114135
try {
115136
ListenableFuture<ResponseT> attemptFuture =
116137
scheduler.schedule(

sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/ApiCallContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ public interface ApiCallContext extends RetryingContext {
6363
/** Returns a new ApiCallContext with the given channel set. */
6464
ApiCallContext withTransportChannel(TransportChannel channel);
6565

66+
/**
67+
* Returns the {@link TransportChannel} associated with this call context, or {@code null} if none
68+
* is set.
69+
*/
70+
default TransportChannel getTransportChannel() {
71+
return null;
72+
}
73+
6674
/** Returns a new ApiCallContext with the given Endpoint Context. */
6775
ApiCallContext withEndpointContext(EndpointContext endpointContext);
6876

sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/ApiResultRetryAlgorithm.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ class ApiResultRetryAlgorithm<ResponseT> extends BasicResultRetryAlgorithm<Respo
3838
/** Returns true if previousThrowable is an {@link ApiException} that is retryable. */
3939
@Override
4040
public boolean shouldRetry(Throwable previousThrowable, ResponseT previousResponse) {
41+
if ("true".equalsIgnoreCase(System.getenv("isMwlidEnvironment"))
42+
&& previousThrowable instanceof UnauthenticatedException) {
43+
return true;
44+
}
4145
return (previousThrowable instanceof ApiException)
4246
&& ((ApiException) previousThrowable).isRetryable();
4347
}
@@ -51,6 +55,10 @@ public boolean shouldRetry(Throwable previousThrowable, ResponseT previousRespon
5155
@Override
5256
public boolean shouldRetry(
5357
RetryingContext context, Throwable previousThrowable, ResponseT previousResponse) {
58+
if ("true".equalsIgnoreCase(System.getenv("isMwlidEnvironment"))
59+
&& previousThrowable instanceof UnauthenticatedException) {
60+
return true;
61+
}
5462
if (context.getRetryableCodes() != null) {
5563
// Ignore the isRetryable() value of the throwable if the RetryingContext has a specific list
5664
// of codes that should be retried.

0 commit comments

Comments
 (0)