forked from GoogleCloudPlatform/grpc-gcp-java
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathGcpClientCall.java
More file actions
300 lines (266 loc) · 10.3 KB
/
GcpClientCall.java
File metadata and controls
300 lines (266 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.grpc;
import com.google.cloud.grpc.proto.AffinityConfig;
import com.google.common.base.MoreObjects;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
/**
* A wrapper of ClientCall that can fetch the affinitykey from the request/response message.
*
* <p>It stores the information such as method, calloptions, the ChannelRef which created it, etc to
* facilitate creating new calls. It gets the affinitykey from the request/response message, and
* defines the callback functions to manage the number of active streams and bind/unbind the
* affinity key with the channel.
*/
public class GcpClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private final MethodDescriptor<ReqT, RespT> methodDescriptor;
private final CallOptions callOptions;
private final GcpManagedChannel delegateChannel;
private final AffinityConfig affinity;
private GcpManagedChannel.ChannelRef delegateChannelRef = null;
private ClientCall<ReqT, RespT> delegateCall = null;
private List<String> keys = null;
private boolean received = false;
private final AtomicBoolean decremented = new AtomicBoolean(false);
@GuardedBy("this")
private final Queue<Runnable> calls = new ArrayDeque<>();
@GuardedBy("this")
private boolean started;
private long startNanos = 0;
protected GcpClientCall(
GcpManagedChannel delegateChannel,
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions,
AffinityConfig affinity) {
this.methodDescriptor = methodDescriptor;
this.callOptions = callOptions;
this.delegateChannel = delegateChannel;
this.affinity = affinity;
}
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
checkSendMessage(() -> delegateCall.start(getListener(responseListener), headers));
}
@Override
public void request(int numMessages) {
checkSendMessage(() -> delegateCall.request(numMessages));
}
@Override
public void setMessageCompression(boolean enabled) {
checkSendMessage(() -> delegateCall.setMessageCompression(enabled));
}
@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
checkSendMessage(() -> checkedCancel(message, cause));
}
@Override
public void halfClose() {
checkSendMessage(() -> delegateCall.halfClose());
}
/**
* Delay executing operations until call.sendMessage() is called, switch the channel, start the
* call, do previous operations, and finally do sendMessage().
*/
@Override
public void sendMessage(ReqT message) {
synchronized (this) {
if (!started) {
startNanos = System.nanoTime();
// Check if the current channelRef is bound with the key and change it if necessary.
// If no channel is bound with the key, use the least busy one.
keys = delegateChannel.checkKeys(message, true, methodDescriptor);
String key = null;
if (keys != null
&& keys.size() == 1
&& delegateChannel.getChannelRef(keys.get(0)) != null) {
key = keys.get(0);
}
if (affinity != null && affinity.getCommand().equals(AffinityConfig.Command.BIND)) {
delegateChannelRef = delegateChannel.getChannelRefForBind();
} else {
delegateChannelRef = delegateChannel.getChannelRef(key);
}
delegateChannelRef.activeStreamsCountIncr();
// Create the client call and do the previous operations.
delegateCall = delegateChannelRef.getChannel().newCall(methodDescriptor, callOptions);
for (Runnable call : calls) {
call.run();
}
calls.clear();
started = true;
}
}
delegateCall.sendMessage(message);
}
/** Calls that send exactly one message should not check this method. */
@Override
public boolean isReady() {
synchronized (this) {
return started && delegateCall.isReady();
}
}
/** May only be called after Listener#onHeaders or Listener#onClose. */
@Override
public Attributes getAttributes() {
synchronized (this) {
if (started) {
return delegateCall.getAttributes();
} else {
throw new IllegalStateException("Calling getAttributes() before sendMessage().");
}
}
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("delegate", delegateCall).toString();
}
private void checkedCancel(@Nullable String message, @Nullable Throwable cause) {
if (!decremented.getAndSet(true)) {
delegateChannelRef.activeStreamsCountDecr(startNanos, Status.CANCELLED, true);
}
delegateCall.cancel(message, cause);
}
private void checkSendMessage(Runnable call) {
synchronized (this) {
if (started) {
call.run();
} else {
calls.add(call);
}
}
}
private Listener<RespT> getListener(final Listener<RespT> responseListener) {
return new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
responseListener) {
// Decrement the stream number by one when the call is closed.
@Override
public void onClose(Status status, Metadata trailers) {
if (!decremented.getAndSet(true)) {
delegateChannelRef.activeStreamsCountDecr(startNanos, status, false);
}
// If the operation completed successfully, bind/unbind the affinity key.
if (keys != null && status.getCode() == Status.Code.OK) {
if (affinity.getCommand() == AffinityConfig.Command.UNBIND) {
delegateChannel.unbind(keys);
} else if (affinity.getCommand() == AffinityConfig.Command.BIND) {
delegateChannel.bind(delegateChannelRef, keys);
}
}
responseListener.onClose(status, trailers);
}
// If the command is "BIND", fetch the affinitykey from the response message and bind it
// with the channelRef.
@Override
public void onMessage(RespT message) {
delegateChannelRef.messageReceived();
if (!received) {
received = true;
if (keys == null) {
keys = delegateChannel.checkKeys(message, false, methodDescriptor);
}
}
responseListener.onMessage(message);
}
};
}
/**
* A simple wrapper of ClientCall.
*
* <p>It defines the callback function to manage the number of active streams of a ChannelRef
* everytime a call is started/closed.
*/
public static class SimpleGcpClientCall<ReqT, RespT> extends ForwardingClientCall<ReqT, RespT> {
private final GcpManagedChannel delegateChannel;
private final GcpManagedChannel.ChannelRef channelRef;
private final ClientCall<ReqT, RespT> delegateCall;
@Nullable private final String affinityKey;
private final boolean unbindOnComplete;
private long startNanos = 0;
private final AtomicBoolean decremented = new AtomicBoolean(false);
protected SimpleGcpClientCall(
GcpManagedChannel delegateChannel,
GcpManagedChannel.ChannelRef channelRef,
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions) {
this.delegateChannel = delegateChannel;
this.channelRef = channelRef;
this.affinityKey = callOptions.getOption(GcpManagedChannel.AFFINITY_KEY);
this.unbindOnComplete = callOptions.getOption(GcpManagedChannel.UNBIND_AFFINITY_KEY);
// Set the actual channel ID in callOptions so downstream interceptors can access it.
CallOptions callOptionsWithChannelId =
callOptions.withOption(GcpManagedChannel.CHANNEL_ID_KEY, channelRef.getId());
this.delegateCall =
channelRef.getChannel().newCall(methodDescriptor, callOptionsWithChannelId);
}
@Override
protected ClientCall<ReqT, RespT> delegate() {
return delegateCall;
}
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
startNanos = System.nanoTime();
Listener<RespT> listener =
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
if (!decremented.getAndSet(true)) {
channelRef.activeStreamsCountDecr(startNanos, status, false);
}
// Unbind the affinity key when the caller explicitly requests it
// (e.g., on terminal RPCs like Commit or Rollback) to prevent
// unbounded growth of the affinity map.
if (unbindOnComplete && affinityKey != null) {
delegateChannel.unbind(Collections.singletonList(affinityKey));
}
super.onClose(status, trailers);
}
@Override
public void onMessage(RespT message) {
channelRef.messageReceived();
super.onMessage(message);
}
};
channelRef.activeStreamsCountIncr();
delegateCall.start(listener, headers);
}
@Override
public void cancel(String message, Throwable cause) {
if (!decremented.getAndSet(true)) {
channelRef.activeStreamsCountDecr(startNanos, Status.CANCELLED, true);
}
// Always unbind on cancel — the transaction is being abandoned.
if (affinityKey != null) {
delegateChannel.unbind(Collections.singletonList(affinityKey));
}
delegateCall.cancel(message, cause);
}
}
}