|
| 1 | +/* |
| 2 | + * Copyright 2019 Google LLC |
| 3 | + * |
| 4 | + * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | + * you may not use this file except in compliance with the License. |
| 6 | + * You may obtain a copy of the License at |
| 7 | + * |
| 8 | + * https://www.apache.org/licenses/LICENSE-2.0 |
| 9 | + * |
| 10 | + * Unless required by applicable law or agreed to in writing, software |
| 11 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | + * See the License for the specific language governing permissions and |
| 14 | + * limitations under the License. |
| 15 | + */ |
| 16 | + |
| 17 | +package com.google.cloud.grpc; |
| 18 | + |
| 19 | +import com.google.cloud.grpc.proto.AffinityConfig; |
| 20 | +import com.google.common.base.MoreObjects; |
| 21 | +import io.grpc.Attributes; |
| 22 | +import io.grpc.CallOptions; |
| 23 | +import io.grpc.ClientCall; |
| 24 | +import io.grpc.ForwardingClientCall; |
| 25 | +import io.grpc.ForwardingClientCallListener; |
| 26 | +import io.grpc.Metadata; |
| 27 | +import io.grpc.MethodDescriptor; |
| 28 | +import io.grpc.Status; |
| 29 | +import java.util.ArrayDeque; |
| 30 | +import java.util.Collections; |
| 31 | +import java.util.List; |
| 32 | +import java.util.Queue; |
| 33 | +import java.util.concurrent.atomic.AtomicBoolean; |
| 34 | +import javax.annotation.Nullable; |
| 35 | +import javax.annotation.concurrent.GuardedBy; |
| 36 | + |
| 37 | +/** |
| 38 | + * A wrapper of ClientCall that can fetch the affinitykey from the request/response message. |
| 39 | + * |
| 40 | + * <p>It stores the information such as method, calloptions, the ChannelRef which created it, etc to |
| 41 | + * facilitate creating new calls. It gets the affinitykey from the request/response message, and |
| 42 | + * defines the callback functions to manage the number of active streams and bind/unbind the |
| 43 | + * affinity key with the channel. |
| 44 | + */ |
| 45 | +public class GcpClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> { |
| 46 | + private final MethodDescriptor<ReqT, RespT> methodDescriptor; |
| 47 | + private final CallOptions callOptions; |
| 48 | + private final GcpManagedChannel delegateChannel; |
| 49 | + private final AffinityConfig affinity; |
| 50 | + |
| 51 | + private GcpManagedChannel.ChannelRef delegateChannelRef = null; |
| 52 | + private ClientCall<ReqT, RespT> delegateCall = null; |
| 53 | + private List<String> keys = null; |
| 54 | + private boolean received = false; |
| 55 | + private final AtomicBoolean decremented = new AtomicBoolean(false); |
| 56 | + |
| 57 | + @GuardedBy("this") |
| 58 | + private final Queue<Runnable> calls = new ArrayDeque<>(); |
| 59 | + |
| 60 | + @GuardedBy("this") |
| 61 | + private boolean started; |
| 62 | + |
| 63 | + private long startNanos = 0; |
| 64 | + |
| 65 | + protected GcpClientCall( |
| 66 | + GcpManagedChannel delegateChannel, |
| 67 | + MethodDescriptor<ReqT, RespT> methodDescriptor, |
| 68 | + CallOptions callOptions, |
| 69 | + AffinityConfig affinity) { |
| 70 | + this.methodDescriptor = methodDescriptor; |
| 71 | + this.callOptions = callOptions; |
| 72 | + this.delegateChannel = delegateChannel; |
| 73 | + this.affinity = affinity; |
| 74 | + } |
| 75 | + |
| 76 | + @Override |
| 77 | + public void start(Listener<RespT> responseListener, Metadata headers) { |
| 78 | + checkSendMessage(() -> delegateCall.start(getListener(responseListener), headers)); |
| 79 | + } |
| 80 | + |
| 81 | + @Override |
| 82 | + public void request(int numMessages) { |
| 83 | + checkSendMessage(() -> delegateCall.request(numMessages)); |
| 84 | + } |
| 85 | + |
| 86 | + @Override |
| 87 | + public void setMessageCompression(boolean enabled) { |
| 88 | + checkSendMessage(() -> delegateCall.setMessageCompression(enabled)); |
| 89 | + } |
| 90 | + |
| 91 | + @Override |
| 92 | + public void cancel(@Nullable String message, @Nullable Throwable cause) { |
| 93 | + checkSendMessage(() -> checkedCancel(message, cause)); |
| 94 | + } |
| 95 | + |
| 96 | + @Override |
| 97 | + public void halfClose() { |
| 98 | + checkSendMessage(() -> delegateCall.halfClose()); |
| 99 | + } |
| 100 | + |
| 101 | + /** |
| 102 | + * Delay executing operations until call.sendMessage() is called, switch the channel, start the |
| 103 | + * call, do previous operations, and finally do sendMessage(). |
| 104 | + */ |
| 105 | + @Override |
| 106 | + public void sendMessage(ReqT message) { |
| 107 | + synchronized (this) { |
| 108 | + if (!started) { |
| 109 | + startNanos = System.nanoTime(); |
| 110 | + // Check if the current channelRef is bound with the key and change it if necessary. |
| 111 | + // If no channel is bound with the key, use the least busy one. |
| 112 | + keys = delegateChannel.checkKeys(message, true, methodDescriptor); |
| 113 | + String key = null; |
| 114 | + if (keys != null |
| 115 | + && keys.size() == 1 |
| 116 | + && delegateChannel.getChannelRef(keys.get(0)) != null) { |
| 117 | + key = keys.get(0); |
| 118 | + } |
| 119 | + |
| 120 | + if (affinity != null && affinity.getCommand().equals(AffinityConfig.Command.BIND)) { |
| 121 | + delegateChannelRef = delegateChannel.getChannelRefForBind(); |
| 122 | + } else { |
| 123 | + delegateChannelRef = delegateChannel.getChannelRef(key); |
| 124 | + } |
| 125 | + delegateChannelRef.activeStreamsCountIncr(); |
| 126 | + |
| 127 | + // Create the client call and do the previous operations. |
| 128 | + delegateCall = delegateChannelRef.getChannel().newCall(methodDescriptor, callOptions); |
| 129 | + for (Runnable call : calls) { |
| 130 | + call.run(); |
| 131 | + } |
| 132 | + calls.clear(); |
| 133 | + started = true; |
| 134 | + } |
| 135 | + } |
| 136 | + delegateCall.sendMessage(message); |
| 137 | + } |
| 138 | + |
| 139 | + /** Calls that send exactly one message should not check this method. */ |
| 140 | + @Override |
| 141 | + public boolean isReady() { |
| 142 | + synchronized (this) { |
| 143 | + return started && delegateCall.isReady(); |
| 144 | + } |
| 145 | + } |
| 146 | + |
| 147 | + /** May only be called after Listener#onHeaders or Listener#onClose. */ |
| 148 | + @Override |
| 149 | + public Attributes getAttributes() { |
| 150 | + synchronized (this) { |
| 151 | + if (started) { |
| 152 | + return delegateCall.getAttributes(); |
| 153 | + } else { |
| 154 | + throw new IllegalStateException("Calling getAttributes() before sendMessage()."); |
| 155 | + } |
| 156 | + } |
| 157 | + } |
| 158 | + |
| 159 | + @Override |
| 160 | + public String toString() { |
| 161 | + return MoreObjects.toStringHelper(this).add("delegate", delegateCall).toString(); |
| 162 | + } |
| 163 | + |
| 164 | + private void checkedCancel(@Nullable String message, @Nullable Throwable cause) { |
| 165 | + if (!decremented.getAndSet(true)) { |
| 166 | + delegateChannelRef.activeStreamsCountDecr(startNanos, Status.CANCELLED, true); |
| 167 | + } |
| 168 | + delegateCall.cancel(message, cause); |
| 169 | + } |
| 170 | + |
| 171 | + private void checkSendMessage(Runnable call) { |
| 172 | + synchronized (this) { |
| 173 | + if (started) { |
| 174 | + call.run(); |
| 175 | + } else { |
| 176 | + calls.add(call); |
| 177 | + } |
| 178 | + } |
| 179 | + } |
| 180 | + |
| 181 | + private Listener<RespT> getListener(final Listener<RespT> responseListener) { |
| 182 | + |
| 183 | + return new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>( |
| 184 | + responseListener) { |
| 185 | + // Decrement the stream number by one when the call is closed. |
| 186 | + @Override |
| 187 | + public void onClose(Status status, Metadata trailers) { |
| 188 | + if (!decremented.getAndSet(true)) { |
| 189 | + delegateChannelRef.activeStreamsCountDecr(startNanos, status, false); |
| 190 | + } |
| 191 | + // If the operation completed successfully, bind/unbind the affinity key. |
| 192 | + if (keys != null && status.getCode() == Status.Code.OK) { |
| 193 | + if (affinity.getCommand() == AffinityConfig.Command.UNBIND) { |
| 194 | + delegateChannel.unbind(keys); |
| 195 | + } else if (affinity.getCommand() == AffinityConfig.Command.BIND) { |
| 196 | + delegateChannel.bind(delegateChannelRef, keys); |
| 197 | + } |
| 198 | + } |
| 199 | + responseListener.onClose(status, trailers); |
| 200 | + } |
| 201 | + |
| 202 | + // If the command is "BIND", fetch the affinitykey from the response message and bind it |
| 203 | + // with the channelRef. |
| 204 | + @Override |
| 205 | + public void onMessage(RespT message) { |
| 206 | + delegateChannelRef.messageReceived(); |
| 207 | + if (!received) { |
| 208 | + received = true; |
| 209 | + if (keys == null) { |
| 210 | + keys = delegateChannel.checkKeys(message, false, methodDescriptor); |
| 211 | + } |
| 212 | + } |
| 213 | + responseListener.onMessage(message); |
| 214 | + } |
| 215 | + }; |
| 216 | + } |
| 217 | + |
| 218 | + /** |
| 219 | + * A simple wrapper of ClientCall. |
| 220 | + * |
| 221 | + * <p>It defines the callback function to manage the number of active streams of a ChannelRef |
| 222 | + * everytime a call is started/closed. |
| 223 | + */ |
| 224 | + public static class SimpleGcpClientCall<ReqT, RespT> extends ForwardingClientCall<ReqT, RespT> { |
| 225 | + |
| 226 | + private final GcpManagedChannel delegateChannel; |
| 227 | + private final GcpManagedChannel.ChannelRef channelRef; |
| 228 | + private final ClientCall<ReqT, RespT> delegateCall; |
| 229 | + @Nullable private final String affinityKey; |
| 230 | + private final boolean unbindOnComplete; |
| 231 | + private long startNanos = 0; |
| 232 | + |
| 233 | + private final AtomicBoolean decremented = new AtomicBoolean(false); |
| 234 | + |
| 235 | + protected SimpleGcpClientCall( |
| 236 | + GcpManagedChannel delegateChannel, |
| 237 | + GcpManagedChannel.ChannelRef channelRef, |
| 238 | + MethodDescriptor<ReqT, RespT> methodDescriptor, |
| 239 | + CallOptions callOptions) { |
| 240 | + this.delegateChannel = delegateChannel; |
| 241 | + this.channelRef = channelRef; |
| 242 | + this.affinityKey = callOptions.getOption(GcpManagedChannel.AFFINITY_KEY); |
| 243 | + this.unbindOnComplete = callOptions.getOption(GcpManagedChannel.UNBIND_AFFINITY_KEY); |
| 244 | + // Set the actual channel ID in callOptions so downstream interceptors can access it. |
| 245 | + CallOptions callOptionsWithChannelId = |
| 246 | + callOptions.withOption(GcpManagedChannel.CHANNEL_ID_KEY, channelRef.getId()); |
| 247 | + this.delegateCall = |
| 248 | + channelRef.getChannel().newCall(methodDescriptor, callOptionsWithChannelId); |
| 249 | + } |
| 250 | + |
| 251 | + @Override |
| 252 | + protected ClientCall<ReqT, RespT> delegate() { |
| 253 | + return delegateCall; |
| 254 | + } |
| 255 | + |
| 256 | + @Override |
| 257 | + public void start(Listener<RespT> responseListener, Metadata headers) { |
| 258 | + startNanos = System.nanoTime(); |
| 259 | + |
| 260 | + Listener<RespT> listener = |
| 261 | + new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>( |
| 262 | + responseListener) { |
| 263 | + @Override |
| 264 | + public void onClose(Status status, Metadata trailers) { |
| 265 | + if (!decremented.getAndSet(true)) { |
| 266 | + channelRef.activeStreamsCountDecr(startNanos, status, false); |
| 267 | + } |
| 268 | + // Unbind the affinity key when the caller explicitly requests it |
| 269 | + // (e.g., on terminal RPCs like Commit or Rollback) to prevent |
| 270 | + // unbounded growth of the affinity map. |
| 271 | + if (unbindOnComplete && affinityKey != null) { |
| 272 | + delegateChannel.unbind(Collections.singletonList(affinityKey)); |
| 273 | + } |
| 274 | + super.onClose(status, trailers); |
| 275 | + } |
| 276 | + |
| 277 | + @Override |
| 278 | + public void onMessage(RespT message) { |
| 279 | + channelRef.messageReceived(); |
| 280 | + super.onMessage(message); |
| 281 | + } |
| 282 | + }; |
| 283 | + |
| 284 | + channelRef.activeStreamsCountIncr(); |
| 285 | + delegateCall.start(listener, headers); |
| 286 | + } |
| 287 | + |
| 288 | + @Override |
| 289 | + public void cancel(String message, Throwable cause) { |
| 290 | + if (!decremented.getAndSet(true)) { |
| 291 | + channelRef.activeStreamsCountDecr(startNanos, Status.CANCELLED, true); |
| 292 | + } |
| 293 | + // Always unbind on cancel — the transaction is being abandoned. |
| 294 | + if (affinityKey != null) { |
| 295 | + delegateChannel.unbind(Collections.singletonList(affinityKey)); |
| 296 | + } |
| 297 | + delegateCall.cancel(message, cause); |
| 298 | + } |
| 299 | + } |
| 300 | +} |
0 commit comments