Skip to content

Commit 643a4bb

Browse files
committed
fix(bigtable): recycle channel on consecutive new stream failures
1 parent d08691a commit 643a4bb

2 files changed

Lines changed: 126 additions & 2 deletions

File tree

java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ public class ChannelPoolDpImpl implements ChannelPool {
6565
private static final String DEFAULT_LOG_NAME = "pool";
6666
private static final AtomicInteger INDEX = new AtomicInteger();
6767

68+
// TODO: Move to client configuration.
69+
private static final int CONSECUTIVE_OPEN_SESSION_FAILURE_THRESHOLD = 5;
70+
private static final Duration INITIAL_RECYCLE_BACKOFF = Duration.ofMillis(1);
71+
private static final Duration MAX_RECYCLE_BACKOFF = Duration.ofMinutes(1);
72+
6873
private final String poolLogId;
6974

7075
@VisibleForTesting volatile int minGroups;
@@ -95,6 +100,12 @@ public class ChannelPoolDpImpl implements ChannelPool {
95100
@GuardedBy("this")
96101
private boolean closed = false;
97102

103+
@GuardedBy("this")
104+
private long lastRecycleNano = 0;
105+
106+
@GuardedBy("this")
107+
private Duration recycleBackoff = INITIAL_RECYCLE_BACKOFF;
108+
98109
public ChannelPoolDpImpl(
99110
Supplier<ManagedChannel> channelSupplier,
100111
ChannelPoolConfiguration config,
@@ -221,6 +232,8 @@ public void start(Listener responseListener, Metadata headers) {
221232
public void onBeforeSessionStart(PeerInfo peerInfo) {
222233
afeId = AfeId.extract(peerInfo);
223234
synchronized (ChannelPoolDpImpl.this) {
235+
channelWrapper.consecutiveFailures = 0;
236+
recycleBackoff = INITIAL_RECYCLE_BACKOFF;
224237
rehomeChannel(channelWrapper, afeId);
225238
sessionsPerAfeId.add(afeId);
226239
}
@@ -232,6 +245,8 @@ public void onClose(Status status, Metadata trailers) {
232245
synchronized (ChannelPoolDpImpl.this) {
233246
if (afeId != null) {
234247
sessionsPerAfeId.remove(afeId);
248+
} else if (!status.isOk() && status.getCode() != Code.CANCELLED) {
249+
channelWrapper.consecutiveFailures++;
235250
}
236251
releaseChannel(channelWrapper, status);
237252
}
@@ -306,12 +321,12 @@ private void releaseChannel(ChannelWrapper channelWrapper, Status status) {
306321
channelWrapper.group.numStreams--;
307322
channelWrapper.numOutstanding--;
308323

309-
if (shouldRecycleChannel(status)) {
324+
if (shouldRecycleChannel(channelWrapper, status)) {
310325
recycleChannel(channelWrapper);
311326
}
312327
}
313328

314-
private static boolean shouldRecycleChannel(Status status) {
329+
private static boolean shouldRecycleChannel(ChannelWrapper channelWrapper, Status status) {
315330
if (status.getCode() == Code.UNIMPLEMENTED) {
316331
return true;
317332
}
@@ -322,6 +337,10 @@ private static boolean shouldRecycleChannel(Status status) {
322337
return true;
323338
}
324339

340+
if (channelWrapper.consecutiveFailures >= CONSECUTIVE_OPEN_SESSION_FAILURE_THRESHOLD) {
341+
return true;
342+
}
343+
325344
return false;
326345
}
327346

@@ -332,6 +351,16 @@ private void recycleChannel(ChannelWrapper channelWrapper) {
332351
return;
333352
}
334353

354+
if (lastRecycleNano > System.nanoTime() - recycleBackoff.toNanos()) {
355+
return;
356+
}
357+
358+
lastRecycleNano = System.nanoTime();
359+
recycleBackoff = recycleBackoff.multipliedBy(2);
360+
if (recycleBackoff.compareTo(MAX_RECYCLE_BACKOFF) > 0) {
361+
recycleBackoff = MAX_RECYCLE_BACKOFF;
362+
}
363+
335364
channelWrapper.group.channels.remove(channelWrapper);
336365
channelWrapper.channel.shutdown();
337366
// Checking for starting group because we don't want to delete the stating group.
@@ -480,6 +509,7 @@ static class ChannelWrapper {
480509
private final ManagedChannel channel;
481510
private final Instant createdAt;
482511
private int numOutstanding = 0;
512+
private int consecutiveFailures = 0;
483513

484514
public ChannelWrapper(AfeChannelGroup group, ManagedChannel channel, Clock clock) {
485515
this.group = group;

java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImplTest.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,4 +469,98 @@ void testRecycledChannelDoesNotRejoinPool() throws InterruptedException {
469469

470470
pool.close();
471471
}
472+
473+
@Test
474+
void testRecycleChannelOnConsecutiveFailures() {
475+
when(channelSupplier.get()).thenReturn(channel);
476+
when(channel.newCall(any(), any())).thenReturn(clientCall);
477+
doNothing().when(clientCall).start(listener.capture(), any());
478+
479+
ChannelPoolDpImpl pool =
480+
new ChannelPoolDpImpl(channelSupplier, defaultConfig, debugTagTracer, bgExecutor);
481+
482+
for (int i = 0; i < 4; i++) {
483+
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
484+
.start(mock(Listener.class), new Metadata());
485+
listener.getValue().onClose(Status.UNAVAILABLE, new Metadata());
486+
487+
// Should not be recycled yet
488+
verify(channel, times(0)).shutdown();
489+
verify(channelSupplier, times(1)).get();
490+
}
491+
492+
// 5th failure
493+
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
494+
.start(mock(Listener.class), new Metadata());
495+
listener.getValue().onClose(Status.UNAVAILABLE, new Metadata());
496+
497+
// Now it should be recycled
498+
verify(channel, times(1)).shutdown();
499+
verify(channelSupplier, times(2)).get();
500+
501+
pool.close();
502+
}
503+
504+
@Test
505+
void testResetConsecutiveFailuresOnSuccess() {
506+
when(channelSupplier.get()).thenReturn(channel);
507+
when(channel.newCall(any(), any())).thenReturn(clientCall);
508+
doNothing().when(clientCall).start(listener.capture(), any());
509+
doReturn(Attributes.EMPTY).when(clientCall).getAttributes();
510+
511+
ChannelPoolDpImpl pool =
512+
new ChannelPoolDpImpl(channelSupplier, defaultConfig, debugTagTracer, bgExecutor);
513+
514+
// 4 failures
515+
for (int i = 0; i < 4; i++) {
516+
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
517+
.start(mock(Listener.class), new Metadata());
518+
listener.getValue().onClose(Status.UNAVAILABLE, new Metadata());
519+
}
520+
verify(channel, times(0)).shutdown();
521+
522+
// A success: onHeaders (which calls onBeforeSessionStart)
523+
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
524+
.start(mock(Listener.class), new Metadata());
525+
526+
PeerInfo peerInfo = PeerInfo.newBuilder().setApplicationFrontendId(555).build();
527+
Metadata headers = new Metadata();
528+
headers.put(
529+
SessionStreamImpl.PEER_INFO_KEY,
530+
Base64.getEncoder().encodeToString(peerInfo.toByteArray()));
531+
listener.getValue().onHeaders(headers);
532+
listener.getValue().onClose(Status.OK, new Metadata());
533+
534+
// Another 4 failures - should still not recycle because counter was reset
535+
for (int i = 0; i < 4; i++) {
536+
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
537+
.start(mock(Listener.class), new Metadata());
538+
listener.getValue().onClose(Status.UNAVAILABLE, new Metadata());
539+
}
540+
verify(channel, times(0)).shutdown();
541+
542+
pool.close();
543+
}
544+
545+
@Test
546+
void testCancelledDoesNotIncrementFailures() {
547+
when(channelSupplier.get()).thenReturn(channel);
548+
when(channel.newCall(any(), any())).thenReturn(clientCall);
549+
doNothing().when(clientCall).start(listener.capture(), any());
550+
551+
ChannelPoolDpImpl pool =
552+
new ChannelPoolDpImpl(channelSupplier, defaultConfig, debugTagTracer, bgExecutor);
553+
554+
for (int i = 0; i < 10; i++) {
555+
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
556+
.start(mock(Listener.class), new Metadata());
557+
listener.getValue().onClose(Status.CANCELLED, new Metadata());
558+
}
559+
560+
// Should never be recycled
561+
verify(channel, times(0)).shutdown();
562+
verify(channelSupplier, times(1)).get();
563+
564+
pool.close();
565+
}
472566
}

0 commit comments

Comments
 (0)