Skip to content

Commit 77488b6

Browse files
committed
fix(bigtable): prevent overflow if there are lots of pending vrpcs
1 parent e5e5f14 commit 77488b6

1 file changed

Lines changed: 21 additions & 10 deletions

File tree

  • java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session

java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ private enum PoolState {
115115
@GuardedBy("this")
116116
private int consecutiveFailures = 0;
117117

118+
@GuardedBy("this")
119+
private boolean isDraining = false;
120+
118121
/**
119122
* When the client fallback to a non-session AFE session creation will return unimplemented
120123
* errors. In which case the requests should fallback to classic client instead of waiting for an
@@ -525,17 +528,25 @@ private void onSessionClose(
525528

526529
@GuardedBy("this")
527530
private void tryDrainPendingRpcs() {
528-
while (!pendingRpcs.isEmpty()) {
529-
if (pendingRpcs.peek().isCancelled) {
530-
pendingRpcs.pop();
531-
continue;
532-
}
533-
Optional<SessionHandle> handle = picker.pickSession();
534-
if (!handle.isPresent()) {
535-
break;
531+
if (isDraining) {
532+
return;
533+
}
534+
isDraining = true;
535+
try {
536+
while (!pendingRpcs.isEmpty()) {
537+
if (pendingRpcs.peek().isCancelled) {
538+
pendingRpcs.pop();
539+
continue;
540+
}
541+
Optional<SessionHandle> handle = picker.pickSession();
542+
if (!handle.isPresent()) {
543+
break;
544+
}
545+
PendingVRpc<?, ?> rpc = pendingRpcs.removeFirst();
546+
rpc.drainTo(handle.get());
536547
}
537-
PendingVRpc<?, ?> rpc = pendingRpcs.removeFirst();
538-
rpc.drainTo(handle.get());
548+
} finally {
549+
isDraining = false;
539550
}
540551
}
541552

0 commit comments

Comments
 (0)