Skip to content

Commit 99c6f0c

Browse files
committed
fix
1 parent ca594e4 commit 99c6f0c

2 files changed

Lines changed: 93 additions & 48 deletions

File tree

java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,9 @@ public void start(ReqT req, VRpcCallContext ctx, VRpcListener<RespT> listener) {
634634
this.deadlineMonitor = monitorDeadline(executorService, ctx.getOperationInfo().getDeadline());
635635

636636
synchronized (SessionPoolImpl.this) {
637+
if (isCancelled) {
638+
return;
639+
}
637640
if (SessionPoolImpl.this.poolState != PoolState.STARTED) {
638641
listener.onClose(
639642
VRpcResult.createUncommitedError(

java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/VRpcImpl.java

Lines changed: 90 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222
import com.google.bigtable.v2.VirtualRpcResponse;
2323
import com.google.cloud.bigtable.data.v2.internal.csm.tracers.DebugTagTracer;
2424
import com.google.cloud.bigtable.data.v2.internal.middleware.VRpc;
25+
import com.google.errorprone.annotations.concurrent.GuardedBy;
2526
import com.google.protobuf.Message;
2627
import com.google.protobuf.MessageLite;
2728
import com.google.protobuf.util.Durations;
2829
import io.grpc.Status;
2930
import java.util.concurrent.TimeUnit;
30-
import java.util.concurrent.atomic.AtomicReference;
3131
import java.util.logging.Logger;
3232
import javax.annotation.Nullable;
3333

@@ -71,7 +71,11 @@ private enum State {
7171
private VRpcListener<RespT> listener;
7272
private PeerInfo peerInfo;
7373

74-
private AtomicReference<State> state;
74+
@GuardedBy("this")
75+
private State state = State.NEW;
76+
77+
@GuardedBy("this")
78+
private Status cancelStatus = null;
7579

7680
private final DebugTagTracer debugTagTracer;
7781

@@ -84,7 +88,6 @@ public VRpcImpl(
8488
this.session = session;
8589
this.desc = desc;
8690
this.rpcId = rpcId;
87-
this.state = new AtomicReference<>(State.NEW);
8891
this.peerInfo = peerInfo;
8992
this.debugTagTracer = debugTagTracer;
9093
}
@@ -93,46 +96,62 @@ public VRpcImpl(
9396
public void start(ReqT req, VRpcCallContext ctx, VRpcListener<RespT> listener) {
9497
this.listener = listener;
9598

96-
Status status;
99+
Status status = null;
97100
boolean retryable = true;
98101

99-
if (!state.compareAndSet(State.NEW, State.STARTED)) {
100-
status = Status.INTERNAL.withDescription("VRpc already started in state: " + state.get());
101-
retryable = false;
102-
} else if (ctx.getOperationInfo().getDeadline().timeRemaining(TimeUnit.MICROSECONDS)
103-
< TimeUnit.MILLISECONDS.toMicros(1)) {
104-
// Don't send RPCs that don't have any hope of succeeding
105-
status =
106-
Status.DEADLINE_EXCEEDED.withDescription("Remaining deadline is too short to send RPC");
107-
retryable = false;
108-
} else {
109-
Metadata vRpcMetadata =
110-
Metadata.newBuilder()
111-
.setAttemptNumber(ctx.getOperationInfo().getAttemptNumber())
112-
.setTraceparent(ctx.getTraceParent())
113-
.build();
114-
ctx.getTracer().onRequestSent(peerInfo);
115-
status =
116-
session.startRpc(
117-
this,
118-
VirtualRpcRequest.newBuilder()
119-
.setRpcId(rpcId)
120-
.setMetadata(vRpcMetadata)
121-
.setDeadline(
122-
Durations.fromNanos(
123-
ctx.getOperationInfo().getDeadline().timeRemaining(TimeUnit.NANOSECONDS)))
124-
.setPayload(desc.encode(req))
125-
.build());
126-
// if status is not OK, the session might not be ready and the vRPC can be retried on a
127-
// different session
102+
synchronized (this) {
103+
if (state == State.CLOSED && cancelStatus != null) {
104+
status = cancelStatus;
105+
retryable = false;
106+
} else if (state != State.NEW) {
107+
status = Status.INTERNAL.withDescription("VRpc already started in state: " + state);
108+
retryable = false;
109+
} else if (ctx.getOperationInfo().getDeadline().timeRemaining(TimeUnit.MICROSECONDS)
110+
< TimeUnit.MILLISECONDS.toMicros(1)) {
111+
// Don't send RPCs that don't have any hope of succeeding
112+
state = State.CLOSED;
113+
status =
114+
Status.DEADLINE_EXCEEDED.withDescription("Remaining deadline is too short to send RPC");
115+
retryable = false;
116+
} else {
117+
state = State.STARTED;
118+
Metadata vRpcMetadata =
119+
Metadata.newBuilder()
120+
.setAttemptNumber(ctx.getOperationInfo().getAttemptNumber())
121+
.setTraceparent(ctx.getTraceParent())
122+
.build();
123+
ctx.getTracer().onRequestSent(peerInfo);
124+
status =
125+
session.startRpc(
126+
this,
127+
VirtualRpcRequest.newBuilder()
128+
.setRpcId(rpcId)
129+
.setMetadata(vRpcMetadata)
130+
.setDeadline(
131+
Durations.fromNanos(
132+
ctx.getOperationInfo()
133+
.getDeadline()
134+
.timeRemaining(TimeUnit.NANOSECONDS)))
135+
.setPayload(desc.encode(req))
136+
.build());
137+
138+
if (!status.isOk()) {
139+
// if status is not OK, the session might not be ready and the vRPC can be
140+
// retried on a
141+
// different session
142+
state = State.CLOSED;
143+
}
144+
}
128145
}
129146

130-
if (!status.isOk()) {
131-
debugTagTracer.checkPrecondition(
132-
state.compareAndSet(State.STARTED, State.CLOSED),
133-
"vrpc_incorrect_start_state",
134-
"VRpc has incorrect state. Expected to be started but was %s",
135-
state);
147+
if (status != null && !status.isOk()) {
148+
synchronized (this) {
149+
debugTagTracer.checkPrecondition(
150+
state == State.STARTED || state == State.CLOSED,
151+
"vrpc_incorrect_start_state",
152+
"VRpc has incorrect state. Expected to be started or closed but was %s",
153+
state);
154+
}
136155
// TODO: loop through the session executor
137156
if (retryable) {
138157
listener.onClose(VRpcResult.createUncommitedError(status));
@@ -143,19 +162,25 @@ public void start(ReqT req, VRpcCallContext ctx, VRpcListener<RespT> listener) {
143162
}
144163

145164
void handleSessionClose(VRpcResult result) {
146-
if (!state.compareAndSet(State.STARTED, State.CLOSED)) {
147-
logger.warning("tried to close a vRPC after it was already closed state: " + state.get());
148-
return;
165+
synchronized (this) {
166+
if (state != State.STARTED) {
167+
logger.warning("tried to close a vRPC after it was already closed state: " + state);
168+
return;
169+
}
170+
state = State.CLOSED;
149171
}
150172

151173
listener.onClose(result);
152174
}
153175

154176
void handleResponse(VirtualRpcResponse response) {
155-
if (!state.compareAndSet(State.STARTED, State.CLOSED)) {
156-
// This can happen if the call was cancelled just before the response arrived.
157-
// Silently ignore it.
158-
return;
177+
synchronized (this) {
178+
if (state != State.STARTED) {
179+
// This can happen if the call was cancelled just before the response arrived.
180+
// Silently ignore it.
181+
return;
182+
}
183+
state = State.CLOSED;
159184
}
160185
// TODO: handle streaming
161186

@@ -186,15 +211,32 @@ void handleResponse(VirtualRpcResponse response) {
186211
}
187212

188213
void handleError(VRpcResult result) {
189-
if (state.getAndSet(State.CLOSED) == State.CLOSED) {
190-
return;
214+
synchronized (this) {
215+
if (state == State.CLOSED) {
216+
return;
217+
}
218+
state = State.CLOSED;
191219
}
192220

193221
listener.onClose(result);
194222
}
195223

196224
@Override
197225
public void cancel(@Nullable String message, @Nullable Throwable cause) {
226+
synchronized (this) {
227+
if (state == State.NEW) {
228+
state = State.CLOSED;
229+
Status status = Status.CANCELLED;
230+
if (message != null) {
231+
status = status.withDescription(message);
232+
}
233+
if (cause != null) {
234+
status = status.withCause(cause);
235+
}
236+
cancelStatus = status;
237+
return;
238+
}
239+
}
198240
session.cancelRpc(rpcId, message, cause);
199241
}
200242

0 commit comments

Comments
 (0)