Skip to content

Commit de263ce

Browse files
committed
fix(bigtable): fix race condition between vrpc cancel and start
1 parent ca594e4 commit de263ce

1 file changed

Lines changed: 20 additions & 1 deletion

File tree

  • java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session

java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/VRpcImpl.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ private enum State {
7272
private PeerInfo peerInfo;
7373

7474
private AtomicReference<State> state;
75+
private AtomicReference<Status> cancelStatus = new AtomicReference<>();
7576

7677
private final DebugTagTracer debugTagTracer;
7778

@@ -97,7 +98,12 @@ public void start(ReqT req, VRpcCallContext ctx, VRpcListener<RespT> listener) {
9798
boolean retryable = true;
9899

99100
if (!state.compareAndSet(State.NEW, State.STARTED)) {
100-
status = Status.INTERNAL.withDescription("VRpc already started in state: " + state.get());
101+
// The vRPC was cancelled before it's started
102+
status = cancelStatus.get();
103+
if (status == null) {
104+
// Otherwise there's a double-start bug
105+
status = Status.INTERNAL.withDescription("VRpc already started in state: " + state.get());
106+
}
101107
retryable = false;
102108
} else if (ctx.getOperationInfo().getDeadline().timeRemaining(TimeUnit.MICROSECONDS)
103109
< TimeUnit.MILLISECONDS.toMicros(1)) {
@@ -195,6 +201,19 @@ void handleError(VRpcResult result) {
195201

196202
@Override
197203
public void cancel(@Nullable String message, @Nullable Throwable cause) {
204+
// Try to transition from NEW to CLOSED to capture pre-start cancellations
205+
if (state.compareAndSet(State.NEW, State.CLOSED)) {
206+
Status status = Status.CANCELLED;
207+
if (message != null) {
208+
status = status.withDescription(message);
209+
}
210+
if (cause != null) {
211+
status = status.withCause(cause);
212+
}
213+
cancelStatus.set(status);
214+
return;
215+
}
216+
// If the vRPC is already started, delegate the cancel to session
198217
session.cancelRpc(rpcId, message, cause);
199218
}
200219

0 commit comments

Comments
 (0)