Skip to content

Commit 50d50df

Browse files
committed
fix(bigtable): recycle channel on consecutive new stream failures
1 parent d08691a commit 50d50df

2 files changed

Lines changed: 121 additions & 2 deletions

File tree

java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ public class ChannelPoolDpImpl implements ChannelPool {
6565
private static final String DEFAULT_LOG_NAME = "pool";
6666
private static final AtomicInteger INDEX = new AtomicInteger();
6767

68+
// TODO: Move to client configuration.
69+
private static final int CONSECUTIVE_OPEN_SESSION_FAILURE_THRESHOLD = 5;
70+
6871
private final String poolLogId;
6972

7073
@VisibleForTesting volatile int minGroups;
@@ -95,6 +98,12 @@ public class ChannelPoolDpImpl implements ChannelPool {
9598
@GuardedBy("this")
9699
private boolean closed = false;
97100

101+
@GuardedBy("this")
102+
private long lastRecycleNano = 0;
103+
104+
@GuardedBy("this")
105+
private Duration recycleBackoff = Duration.ofMillis(1);
106+
98107
public ChannelPoolDpImpl(
99108
Supplier<ManagedChannel> channelSupplier,
100109
ChannelPoolConfiguration config,
@@ -221,6 +230,8 @@ public void start(Listener responseListener, Metadata headers) {
221230
public void onBeforeSessionStart(PeerInfo peerInfo) {
222231
afeId = AfeId.extract(peerInfo);
223232
synchronized (ChannelPoolDpImpl.this) {
233+
channelWrapper.consecutiveFailures = 0;
234+
recycleBackoff = Duration.ofMillis(1);
224235
rehomeChannel(channelWrapper, afeId);
225236
sessionsPerAfeId.add(afeId);
226237
}
@@ -232,6 +243,8 @@ public void onClose(Status status, Metadata trailers) {
232243
synchronized (ChannelPoolDpImpl.this) {
233244
if (afeId != null) {
234245
sessionsPerAfeId.remove(afeId);
246+
} else if (!status.isOk() && status.getCode() != Code.CANCELLED) {
247+
channelWrapper.consecutiveFailures++;
235248
}
236249
releaseChannel(channelWrapper, status);
237250
}
@@ -306,12 +319,12 @@ private void releaseChannel(ChannelWrapper channelWrapper, Status status) {
306319
channelWrapper.group.numStreams--;
307320
channelWrapper.numOutstanding--;
308321

309-
if (shouldRecycleChannel(status)) {
322+
if (shouldRecycleChannel(channelWrapper, status)) {
310323
recycleChannel(channelWrapper);
311324
}
312325
}
313326

314-
private static boolean shouldRecycleChannel(Status status) {
327+
private static boolean shouldRecycleChannel(ChannelWrapper channelWrapper, Status status) {
315328
if (status.getCode() == Code.UNIMPLEMENTED) {
316329
return true;
317330
}
@@ -322,6 +335,10 @@ private static boolean shouldRecycleChannel(Status status) {
322335
return true;
323336
}
324337

338+
if (channelWrapper.consecutiveFailures >= CONSECUTIVE_OPEN_SESSION_FAILURE_THRESHOLD) {
339+
return true;
340+
}
341+
325342
return false;
326343
}
327344

@@ -332,6 +349,13 @@ private void recycleChannel(ChannelWrapper channelWrapper) {
332349
return;
333350
}
334351

352+
if (lastRecycleNano > System.nanoTime() - recycleBackoff.toNanos()) {
353+
return;
354+
}
355+
356+
lastRecycleNano = System.nanoTime();
357+
recycleBackoff = recycleBackoff.multipliedBy(2);
358+
335359
channelWrapper.group.channels.remove(channelWrapper);
336360
channelWrapper.channel.shutdown();
337361
// Checking for starting group because we don't want to delete the stating group.
@@ -480,6 +504,7 @@ static class ChannelWrapper {
480504
private final ManagedChannel channel;
481505
private final Instant createdAt;
482506
private int numOutstanding = 0;
507+
private int consecutiveFailures = 0;
483508

484509
public ChannelWrapper(AfeChannelGroup group, ManagedChannel channel, Clock clock) {
485510
this.group = group;

java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImplTest.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,4 +469,98 @@ void testRecycledChannelDoesNotRejoinPool() throws InterruptedException {
469469

470470
pool.close();
471471
}
472+
473+
@Test
474+
void testRecycleChannelOnConsecutiveFailures() {
475+
when(channelSupplier.get()).thenReturn(channel);
476+
when(channel.newCall(any(), any())).thenReturn(clientCall);
477+
doNothing().when(clientCall).start(listener.capture(), any());
478+
479+
ChannelPoolDpImpl pool =
480+
new ChannelPoolDpImpl(channelSupplier, defaultConfig, debugTagTracer, bgExecutor);
481+
482+
for (int i = 0; i < 4; i++) {
483+
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
484+
.start(mock(Listener.class), new Metadata());
485+
listener.getValue().onClose(Status.UNAVAILABLE, new Metadata());
486+
487+
// Should not be recycled yet
488+
verify(channel, times(0)).shutdown();
489+
verify(channelSupplier, times(1)).get();
490+
}
491+
492+
// 5th failure
493+
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
494+
.start(mock(Listener.class), new Metadata());
495+
listener.getValue().onClose(Status.UNAVAILABLE, new Metadata());
496+
497+
// Now it should be recycled
498+
verify(channel, times(1)).shutdown();
499+
verify(channelSupplier, times(2)).get();
500+
501+
pool.close();
502+
}
503+
504+
@Test
505+
void testResetConsecutiveFailuresOnSuccess() {
506+
when(channelSupplier.get()).thenReturn(channel);
507+
when(channel.newCall(any(), any())).thenReturn(clientCall);
508+
doNothing().when(clientCall).start(listener.capture(), any());
509+
doReturn(Attributes.EMPTY).when(clientCall).getAttributes();
510+
511+
ChannelPoolDpImpl pool =
512+
new ChannelPoolDpImpl(channelSupplier, defaultConfig, debugTagTracer, bgExecutor);
513+
514+
// 4 failures
515+
for (int i = 0; i < 4; i++) {
516+
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
517+
.start(mock(Listener.class), new Metadata());
518+
listener.getValue().onClose(Status.UNAVAILABLE, new Metadata());
519+
}
520+
verify(channel, times(0)).shutdown();
521+
522+
// A success: onHeaders (which calls onBeforeSessionStart)
523+
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
524+
.start(mock(Listener.class), new Metadata());
525+
526+
PeerInfo peerInfo = PeerInfo.newBuilder().setApplicationFrontendId(555).build();
527+
Metadata headers = new Metadata();
528+
headers.put(
529+
SessionStreamImpl.PEER_INFO_KEY,
530+
Base64.getEncoder().encodeToString(peerInfo.toByteArray()));
531+
listener.getValue().onHeaders(headers);
532+
listener.getValue().onClose(Status.OK, new Metadata());
533+
534+
// Another 4 failures - should still not recycle because counter was reset
535+
for (int i = 0; i < 4; i++) {
536+
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
537+
.start(mock(Listener.class), new Metadata());
538+
listener.getValue().onClose(Status.UNAVAILABLE, new Metadata());
539+
}
540+
verify(channel, times(0)).shutdown();
541+
542+
pool.close();
543+
}
544+
545+
@Test
546+
void testCancelledDoesNotIncrementFailures() {
547+
when(channelSupplier.get()).thenReturn(channel);
548+
when(channel.newCall(any(), any())).thenReturn(clientCall);
549+
doNothing().when(clientCall).start(listener.capture(), any());
550+
551+
ChannelPoolDpImpl pool =
552+
new ChannelPoolDpImpl(channelSupplier, defaultConfig, debugTagTracer, bgExecutor);
553+
554+
for (int i = 0; i < 10; i++) {
555+
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
556+
.start(mock(Listener.class), new Metadata());
557+
listener.getValue().onClose(Status.CANCELLED, new Metadata());
558+
}
559+
560+
// Should never be recycled
561+
verify(channel, times(0)).shutdown();
562+
verify(channelSupplier, times(1)).get();
563+
564+
pool.close();
565+
}
472566
}

0 commit comments

Comments
 (0)