2222import com .google .bigtable .v2 .VirtualRpcResponse ;
2323import com .google .cloud .bigtable .data .v2 .internal .csm .tracers .DebugTagTracer ;
2424import com .google .cloud .bigtable .data .v2 .internal .middleware .VRpc ;
25+ import com .google .errorprone .annotations .concurrent .GuardedBy ;
2526import com .google .protobuf .Message ;
2627import com .google .protobuf .MessageLite ;
2728import com .google .protobuf .util .Durations ;
2829import io .grpc .Status ;
2930import java .util .concurrent .TimeUnit ;
30- import java .util .concurrent .atomic .AtomicReference ;
3131import java .util .logging .Logger ;
3232import javax .annotation .Nullable ;
3333
@@ -71,7 +71,11 @@ private enum State {
7171 private VRpcListener <RespT > listener ;
7272 private PeerInfo peerInfo ;
7373
74- private AtomicReference <State > state ;
74+ @ GuardedBy ("this" )
75+ private State state = State .NEW ;
76+
77+ @ GuardedBy ("this" )
78+ private Status cancelStatus = null ;
7579
7680 private final DebugTagTracer debugTagTracer ;
7781
@@ -84,7 +88,6 @@ public VRpcImpl(
8488 this .session = session ;
8589 this .desc = desc ;
8690 this .rpcId = rpcId ;
87- this .state = new AtomicReference <>(State .NEW );
8891 this .peerInfo = peerInfo ;
8992 this .debugTagTracer = debugTagTracer ;
9093 }
@@ -93,46 +96,62 @@ public VRpcImpl(
9396 public void start (ReqT req , VRpcCallContext ctx , VRpcListener <RespT > listener ) {
9497 this .listener = listener ;
9598
96- Status status ;
99+ Status status = null ;
97100 boolean retryable = true ;
98101
99- if (!state .compareAndSet (State .NEW , State .STARTED )) {
100- status = Status .INTERNAL .withDescription ("VRpc already started in state: " + state .get ());
101- retryable = false ;
102- } else if (ctx .getOperationInfo ().getDeadline ().timeRemaining (TimeUnit .MICROSECONDS )
103- < TimeUnit .MILLISECONDS .toMicros (1 )) {
104- // Don't send RPCs that don't have any hope of succeeding
105- status =
106- Status .DEADLINE_EXCEEDED .withDescription ("Remaining deadline is too short to send RPC" );
107- retryable = false ;
108- } else {
109- Metadata vRpcMetadata =
110- Metadata .newBuilder ()
111- .setAttemptNumber (ctx .getOperationInfo ().getAttemptNumber ())
112- .setTraceparent (ctx .getTraceParent ())
113- .build ();
114- ctx .getTracer ().onRequestSent (peerInfo );
115- status =
116- session .startRpc (
117- this ,
118- VirtualRpcRequest .newBuilder ()
119- .setRpcId (rpcId )
120- .setMetadata (vRpcMetadata )
121- .setDeadline (
122- Durations .fromNanos (
123- ctx .getOperationInfo ().getDeadline ().timeRemaining (TimeUnit .NANOSECONDS )))
124- .setPayload (desc .encode (req ))
125- .build ());
126- // if status is not OK, the session might not be ready and the vRPC can be retried on a
127- // different session
102+ synchronized (this ) {
103+ if (state == State .CLOSED && cancelStatus != null ) {
104+ status = cancelStatus ;
105+ retryable = false ;
106+ } else if (state != State .NEW ) {
107+ status = Status .INTERNAL .withDescription ("VRpc already started in state: " + state );
108+ retryable = false ;
109+ } else if (ctx .getOperationInfo ().getDeadline ().timeRemaining (TimeUnit .MICROSECONDS )
110+ < TimeUnit .MILLISECONDS .toMicros (1 )) {
111+ // Don't send RPCs that don't have any hope of succeeding
112+ state = State .CLOSED ;
113+ status =
114+ Status .DEADLINE_EXCEEDED .withDescription ("Remaining deadline is too short to send RPC" );
115+ retryable = false ;
116+ } else {
117+ state = State .STARTED ;
118+ Metadata vRpcMetadata =
119+ Metadata .newBuilder ()
120+ .setAttemptNumber (ctx .getOperationInfo ().getAttemptNumber ())
121+ .setTraceparent (ctx .getTraceParent ())
122+ .build ();
123+ ctx .getTracer ().onRequestSent (peerInfo );
124+ status =
125+ session .startRpc (
126+ this ,
127+ VirtualRpcRequest .newBuilder ()
128+ .setRpcId (rpcId )
129+ .setMetadata (vRpcMetadata )
130+ .setDeadline (
131+ Durations .fromNanos (
132+ ctx .getOperationInfo ()
133+ .getDeadline ()
134+ .timeRemaining (TimeUnit .NANOSECONDS )))
135+ .setPayload (desc .encode (req ))
136+ .build ());
137+
138+ if (!status .isOk ()) {
139+ // if status is not OK, the session might not be ready and the vRPC can be
140+ // retried on a
141+ // different session
142+ state = State .CLOSED ;
143+ }
144+ }
128145 }
129146
130- if (!status .isOk ()) {
131- debugTagTracer .checkPrecondition (
132- state .compareAndSet (State .STARTED , State .CLOSED ),
133- "vrpc_incorrect_start_state" ,
134- "VRpc has incorrect state. Expected to be started but was %s" ,
135- state );
147+ if (status != null && !status .isOk ()) {
148+ synchronized (this ) {
149+ debugTagTracer .checkPrecondition (
150+ state == State .STARTED || state == State .CLOSED ,
151+ "vrpc_incorrect_start_state" ,
152+ "VRpc has incorrect state. Expected to be started or closed but was %s" ,
153+ state );
154+ }
136155 // TODO: loop through the session executor
137156 if (retryable ) {
138157 listener .onClose (VRpcResult .createUncommitedError (status ));
@@ -143,19 +162,25 @@ public void start(ReqT req, VRpcCallContext ctx, VRpcListener<RespT> listener) {
143162 }
144163
145164 void handleSessionClose (VRpcResult result ) {
146- if (!state .compareAndSet (State .STARTED , State .CLOSED )) {
147- logger .warning ("tried to close a vRPC after it was already closed state: " + state .get ());
148- return ;
165+ synchronized (this ) {
166+ if (state != State .STARTED ) {
167+ logger .warning ("tried to close a vRPC after it was already closed state: " + state );
168+ return ;
169+ }
170+ state = State .CLOSED ;
149171 }
150172
151173 listener .onClose (result );
152174 }
153175
154176 void handleResponse (VirtualRpcResponse response ) {
155- if (!state .compareAndSet (State .STARTED , State .CLOSED )) {
156- // This can happen if the call was cancelled just before the response arrived.
157- // Silently ignore it.
158- return ;
177+ synchronized (this ) {
178+ if (state != State .STARTED ) {
179+ // This can happen if the call was cancelled just before the response arrived.
180+ // Silently ignore it.
181+ return ;
182+ }
183+ state = State .CLOSED ;
159184 }
160185 // TODO: handle streaming
161186
@@ -186,15 +211,32 @@ void handleResponse(VirtualRpcResponse response) {
186211 }
187212
188213 void handleError (VRpcResult result ) {
189- if (state .getAndSet (State .CLOSED ) == State .CLOSED ) {
190- return ;
214+ synchronized (this ) {
215+ if (state == State .CLOSED ) {
216+ return ;
217+ }
218+ state = State .CLOSED ;
191219 }
192220
193221 listener .onClose (result );
194222 }
195223
196224 @ Override
197225 public void cancel (@ Nullable String message , @ Nullable Throwable cause ) {
226+ synchronized (this ) {
227+ if (state == State .NEW ) {
228+ state = State .CLOSED ;
229+ Status status = Status .CANCELLED ;
230+ if (message != null ) {
231+ status = status .withDescription (message );
232+ }
233+ if (cause != null ) {
234+ status = status .withCause (cause );
235+ }
236+ cancelStatus = status ;
237+ return ;
238+ }
239+ }
198240 session .cancelRpc (rpcId , message , cause );
199241 }
200242
0 commit comments