Skip to content

Commit 4f508a8

Browse files
committed
feat(gax): implement dynamic channel refreshing on 401 retries
1 parent 76b290f commit 4f508a8

9 files changed

Lines changed: 88 additions & 0 deletions

File tree

sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ class ChannelPool extends ManagedChannel {
8282
private ScheduledFuture<?> resizeFuture = null;
8383

8484
private final Object entryWriteLock = new Object();
85+
private long lastRefreshTimeNanos = 0;
8586
@VisibleForTesting final AtomicReference<ImmutableList<Entry>> entries = new AtomicReference<>();
8687
private final AtomicInteger indexTicker = new AtomicInteger();
8788
private final String authority;
@@ -441,6 +442,13 @@ void refresh() {
441442
// - then thread2 will shut down channel that thread1 will put back into circulation (after it
442443
// replaces the list)
443444
synchronized (entryWriteLock) {
445+
long now = System.nanoTime();
446+
if (now - lastRefreshTimeNanos < TimeUnit.SECONDS.toNanos(5)) {
447+
LOG.fine("Channel pool was refreshed recently, skipping duplicate refresh");
448+
return;
449+
}
450+
lastRefreshTimeNanos = now;
451+
444452
LOG.fine("Refreshing all channels");
445453
ArrayList<Entry> newEntries = new ArrayList<>(entries.get());
446454

sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcCallContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,14 @@ public GrpcCallContext withCredentials(Credentials newCredentials) {
211211
isDirectPath);
212212
}
213213

214+
@Override
215+
public TransportChannel getTransportChannel() {
216+
if (channel instanceof io.grpc.ManagedChannel) {
217+
return GrpcTransportChannel.create((io.grpc.ManagedChannel) channel);
218+
}
219+
return null;
220+
}
221+
214222
@Override
215223
public GrpcCallContext withTransportChannel(TransportChannel inputChannel) {
216224
Preconditions.checkNotNull(inputChannel);

sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcTransportChannel.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,14 @@ public Channel getChannel() {
6666
return getManagedChannel();
6767
}
6868

69+
@Override
70+
public void refresh() {
71+
Channel channel = getChannel();
72+
if (channel instanceof ChannelPool) {
73+
((ChannelPool) channel).refresh();
74+
}
75+
}
76+
6977
@Override
7078
public void shutdown() {
7179
getManagedChannel().shutdown();

sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/retrying/BasicRetryingFuture.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ public TimedAttemptSettings getAttemptSettings() {
116116
}
117117
}
118118

119+
@Override
120+
public RetryingContext getRetryingContext() {
121+
return retryingContext;
122+
}
123+
119124
@Override
120125
public ApiFuture<ResponseT> peekAttemptResult() {
121126
synchronized (lock) {

sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/retrying/RetryingFuture.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,4 +128,12 @@ public interface RetryingFuture<ResponseT> extends ApiFuture<ResponseT> {
128128
* </ul>
129129
*/
130130
ApiFuture<ResponseT> getAttemptResult();
131+
132+
/**
133+
* Returns the retrying context associated with this future, or {@code null} if none is set.
134+
*/
135+
@com.google.api.core.BetaApi("The surface for passing per operation state is not yet stable")
136+
default RetryingContext getRetryingContext() {
137+
return null;
138+
}
131139
}

sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/retrying/ScheduledRetryingExecutor.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,10 @@ public RetryingFuture<ResponseT> createFuture(
111111
*/
112112
@Override
113113
public ApiFuture<ResponseT> submit(RetryingFuture<ResponseT> retryingFuture) {
114+
if ("true".equalsIgnoreCase(System.getenv("isMwlidEnvironment"))) {
115+
checkForFailedChannelRefresh(retryingFuture);
116+
}
117+
114118
try {
115119
ListenableFuture<ResponseT> attemptFuture =
116120
scheduler.schedule(
@@ -122,4 +126,27 @@ public ApiFuture<ResponseT> submit(RetryingFuture<ResponseT> retryingFuture) {
122126
return ApiFutures.immediateFailedFuture(e);
123127
}
124128
}
129+
130+
private void checkForFailedChannelRefresh(RetryingFuture<ResponseT> retryingFuture) {
131+
ApiFuture<ResponseT> lastAttemptResult = retryingFuture.peekAttemptResult();
132+
if (lastAttemptResult != null && lastAttemptResult.isDone()) {
133+
try {
134+
lastAttemptResult.get();
135+
} catch (java.util.concurrent.ExecutionException e) {
136+
Throwable cause = e.getCause();
137+
if (cause instanceof com.google.api.gax.rpc.UnauthenticatedException) {
138+
RetryingContext context = retryingFuture.getRetryingContext();
139+
if (context instanceof com.google.api.gax.rpc.ApiCallContext) {
140+
com.google.api.gax.rpc.TransportChannel transportChannel =
141+
((com.google.api.gax.rpc.ApiCallContext) context).getTransportChannel();
142+
if (transportChannel != null) {
143+
transportChannel.refresh();
144+
}
145+
}
146+
}
147+
} catch (Exception ignored) {
148+
// Ignore cancellations or interruptions
149+
}
150+
}
151+
}
125152
}

sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/ApiCallContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ public interface ApiCallContext extends RetryingContext {
6363
/** Returns a new ApiCallContext with the given channel set. */
6464
ApiCallContext withTransportChannel(TransportChannel channel);
6565

66+
/**
67+
* Returns the {@link TransportChannel} associated with this call context, or {@code null} if none
68+
* is set.
69+
*/
70+
default TransportChannel getTransportChannel() {
71+
return null;
72+
}
73+
6674
/** Returns a new ApiCallContext with the given Endpoint Context. */
6775
ApiCallContext withEndpointContext(EndpointContext endpointContext);
6876

sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/ApiResultRetryAlgorithm.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ class ApiResultRetryAlgorithm<ResponseT> extends BasicResultRetryAlgorithm<Respo
3838
/** Returns true if previousThrowable is an {@link ApiException} that is retryable. */
3939
@Override
4040
public boolean shouldRetry(Throwable previousThrowable, ResponseT previousResponse) {
41+
if ("true".equalsIgnoreCase(System.getenv("isMwlidEnvironment"))
42+
&& previousThrowable instanceof UnauthenticatedException) {
43+
return true;
44+
}
4145
return (previousThrowable instanceof ApiException)
4246
&& ((ApiException) previousThrowable).isRetryable();
4347
}
@@ -51,6 +55,10 @@ public boolean shouldRetry(Throwable previousThrowable, ResponseT previousRespon
5155
@Override
5256
public boolean shouldRetry(
5357
RetryingContext context, Throwable previousThrowable, ResponseT previousResponse) {
58+
if ("true".equalsIgnoreCase(System.getenv("isMwlidEnvironment"))
59+
&& previousThrowable instanceof UnauthenticatedException) {
60+
return true;
61+
}
5462
if (context.getRetryableCodes() != null) {
5563
// Ignore the isRetryable() value of the throwable if the RetryingContext has a specific list
5664
// of codes that should be retried.

sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/TransportChannel.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,12 @@ public interface TransportChannel extends BackgroundResource {
4747
* Returns an empty {@link ApiCallContext} that is compatible with this {@code TransportChannel}.
4848
*/
4949
ApiCallContext getEmptyCallContext();
50+
51+
/**
52+
* Refreshes or recreates the underlying network connections of this transport channel.
53+
*
54+
* <p>By default, this is a no-op for transports that do not require stateful connection lifecycle
55+
* management.
56+
*/
57+
default void refresh() {}
5058
}

0 commit comments

Comments
 (0)