Skip to content

Commit e14eb71

Browse files
refactor: Step 10 — route PendingVRpc per-op state through op executor
isCancelled and realCall are now exclusively owned by ctx.getExecutor() (the per-op SerializingExecutor), eliminating the cancel/drainTo race that required the NOOP_CALL sentinel. cancel() eagerly removes from pendingRpcs under the pool lock, then dispatches the state transition to the op executor. drainTo() cancels the deadline monitor before dispatching, and checks isCancelled on the op executor so a cancelled-before-drain rpc returns its session cleanly via onVRpcComplete(). popClosableRpcs() and tryDrainPendingRpcs() are simplified now that cancelled rpcs are removed eagerly. close() copies pendingRpcs before clearing to avoid ConcurrentModificationException when cancel() re-acquires the pool lock. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 1a5bf3b commit e14eb71

1 file changed

Lines changed: 40 additions & 54 deletions

File tree

  • java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session

java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java

Lines changed: 40 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ public SessionPoolInfo getInfo() {
245245
public void close(CloseSessionRequest req) {
246246
configListenerHandle.close();
247247

248+
List<PendingVRpc<?, ?>> toCancel;
248249
synchronized (this) {
249250
if (poolState == PoolState.CLOSED) {
250251
logger.fine(String.format("Tried to close a closed SessionPool %s", info.getLogName()));
@@ -254,9 +255,8 @@ public void close(CloseSessionRequest req) {
254255

255256
poolState = PoolState.CLOSED;
256257

257-
for (PendingVRpc<?, ?> pendingRpc : pendingRpcs) {
258-
pendingRpc.cancel("SessionPool closed: " + req, null);
259-
}
258+
toCancel = new ArrayList<>(pendingRpcs);
259+
pendingRpcs.clear();
260260
afeListPruneTask.cancel(false);
261261
if (retryCreateSessionFuture != null) {
262262
retryCreateSessionFuture.cancel(false);
@@ -265,6 +265,10 @@ public void close(CloseSessionRequest req) {
265265
watchdog.close();
266266
sessions.close(req);
267267
}
268+
269+
for (PendingVRpc<?, ?> pendingRpc : toCancel) {
270+
pendingRpc.cancel("SessionPool closed: " + req, null);
271+
}
268272
}
269273

270274
@Override
@@ -506,7 +510,7 @@ private void onSessionClose(
506510
status, trailers)));
507511
for (PendingVRpc<?, ?> vrpc : toBeClosed) {
508512
try {
509-
vrpc.ctx.getExecutor().execute(() -> vrpc.listener.onClose(result));
513+
vrpc.cancelWithResult(result);
510514
} catch (Throwable t) {
511515
logger.log(Level.WARNING, "Exception when closing request", t);
512516
}
@@ -517,10 +521,6 @@ private void onSessionClose(
517521
@GuardedBy("this")
518522
private void tryDrainPendingRpcs() {
519523
while (!pendingRpcs.isEmpty()) {
520-
if (pendingRpcs.peek().isCancelled) {
521-
pendingRpcs.pop();
522-
continue;
523-
}
524524
Optional<SessionHandle> handle = picker.pickSession();
525525
if (!handle.isPresent()) {
526526
break;
@@ -536,11 +536,8 @@ private void tryDrainPendingRpcs() {
536536
Iterator<PendingVRpc<?, ?>> iter = pendingRpcs.iterator();
537537
while (iter.hasNext()) {
538538
PendingVRpc<?, ?> vrpc = iter.next();
539-
// vrpcs that have started on a session gets closed in SessionImpl. Do not double close.
540-
if (!vrpc.isCancelled && vrpc.realCall == null) {
541-
iter.remove();
542-
toBeClosed.add(vrpc);
543-
}
539+
iter.remove();
540+
toBeClosed.add(vrpc);
544541
}
545542
return toBeClosed;
546543
}
@@ -561,7 +558,6 @@ public synchronized <ReqT extends Message, RespT extends Message> VRpc<ReqT, Res
561558
return new PendingVRpc<>(desc);
562559
}
563560

564-
@GuardedBy("this")
565561
private <ReqT extends Message, RespT extends Message> VRpc<ReqT, RespT> newRealCall(
566562
VRpcDescriptor<?, ReqT, RespT> desc, SessionHandle handle) {
567563

@@ -654,9 +650,6 @@ public void start(ReqT req, VRpcCallContext ctx, VRpcListener<RespT> listener) {
654650
}
655651
}
656652

657-
// It's safe to call cancel on a vrpc more than once. It'll be a noop after the initial
658-
// call. Cancelled vrpcs are removed from the pending vrpc queue the next time we
659-
// drain the queue.
660653
@Override
661654
public void cancel(@Nullable String message, @Nullable Throwable cause) {
662655
Status status = Status.CANCELLED;
@@ -669,30 +662,31 @@ public void cancel(@Nullable String message, @Nullable Throwable cause) {
669662
cancel(status, false);
670663
}
671664

672-
// Cancel could race with drainTo which sets the real call. Assign realCall to a NOOP_CALL
673-
// so if drainTo gets called at the same time, it'll just get swallowed and we're only calling
674-
// onClose once on the listener. The cancel could also come from deadline monitor when
675-
// the deadline expires. In this case if the real call is already set, we want to real call
676-
// to handle the deadline and return early.
665+
// cancel() and drainTo() are sequenced via ctx.getExecutor() (a per-op SerializingExecutor),
666+
// so isCancelled and realCall are owned exclusively by that executor — no pool lock needed.
677667
private void cancel(Status status, boolean onlyCancelPendingCall) {
678-
boolean delegateToRealCall = true;
679668
synchronized (SessionPoolImpl.this) {
680-
if (isCancelled) {
681-
return;
682-
}
669+
pendingRpcs.remove(this); // eager removal; no-op if already drained
670+
}
671+
ctx.getExecutor().execute(() -> {
672+
if (isCancelled) return;
683673
isCancelled = true;
684-
if (realCall == null) {
685-
this.realCall = NOOP_CALL;
686-
delegateToRealCall = false;
687-
} else if (onlyCancelPendingCall) {
688-
return;
674+
if (realCall != null) {
675+
if (!onlyCancelPendingCall) {
676+
realCall.cancel(status.getDescription(), status.getCause());
677+
}
678+
} else {
679+
listener.onClose(VRpcResult.createRejectedError(status));
689680
}
690-
}
691-
if (delegateToRealCall) {
692-
realCall.cancel(status.getDescription(), status.getCause());
693-
} else {
694-
ctx.getExecutor().execute(() -> listener.onClose(VRpcResult.createRejectedError(status)));
695-
}
681+
});
682+
}
683+
684+
void cancelWithResult(VRpcResult result) {
685+
ctx.getExecutor().execute(() -> {
686+
if (isCancelled) return;
687+
isCancelled = true;
688+
listener.onClose(result);
689+
});
696690
}
697691

698692
@Override
@@ -705,15 +699,18 @@ public void requestNext() {
705699
}
706700

707701
private void drainTo(SessionHandle handle) {
708-
synchronized (SessionPoolImpl.this) {
709-
if (realCall == null) {
710-
this.realCall = newRealCall(desc, handle);
711-
}
712-
}
713-
this.realCall.start(req, ctx, listener);
714702
if (deadlineMonitor != null) {
715703
deadlineMonitor.cancel(false);
716704
}
705+
ctx.getExecutor().execute(() -> {
706+
if (isCancelled) {
707+
SessionPoolImpl.this.onVRpcComplete(
708+
handle, Duration.ZERO, VRpcResult.createRejectedError(Status.CANCELLED));
709+
return;
710+
}
711+
realCall = newRealCall(desc, handle);
712+
realCall.start(req, ctx, listener);
713+
});
717714
}
718715

719716
private VRpcListener<RespT> getListener() {
@@ -828,15 +825,4 @@ public void close() {
828825
}
829826
}
830827

831-
private static final VRpc NOOP_CALL =
832-
new VRpc() {
833-
@Override
834-
public void start(Object req, VRpcCallContext ctx, VRpcListener listener) {}
835-
836-
@Override
837-
public void cancel(@Nullable String message, @Nullable Throwable cause) {}
838-
839-
@Override
840-
public void requestNext() {}
841-
};
842828
}

0 commit comments

Comments
 (0)