|
16 | 16 |
|
17 | 17 | package com.google.apphosting.runtime.http; |
18 | 18 |
|
19 | | -import static com.google.apphosting.runtime.http.HttpApiHostClient.REQUEST_ENDPOINT; |
20 | | - |
| 19 | +import com.google.apphosting.base.protos.RuntimePb.APIRequest; |
| 20 | +import com.google.apphosting.base.protos.RuntimePb.APIResponse; |
| 21 | +import com.google.apphosting.base.protos.RuntimePb.APIResponse.ERROR; |
| 22 | +import com.google.apphosting.base.protos.RuntimePb.APIResponse.RpcError; |
| 23 | +import com.google.apphosting.base.protos.Status.StatusProto; |
| 24 | +import com.google.apphosting.base.protos.api_bytes.RemoteApiPb; |
21 | 25 | import com.google.apphosting.runtime.anyrpc.APIHostClientInterface; |
22 | | -import com.google.apphosting.runtime.http.HttpApiHostClient.Config; |
| 26 | +import com.google.apphosting.runtime.anyrpc.AnyRpcCallback; |
| 27 | +import com.google.apphosting.runtime.anyrpc.AnyRpcClientContext; |
| 28 | +import com.google.apphosting.utils.runtime.ApiProxyUtils; |
| 29 | +import com.google.auto.value.AutoValue; |
| 30 | +import com.google.common.base.Preconditions; |
| 31 | +import com.google.common.collect.ImmutableMap; |
23 | 32 | import com.google.common.flogger.GoogleLogger; |
24 | | -import com.google.common.net.HostAndPort; |
| 33 | +import com.google.protobuf.ByteString; |
| 34 | +import com.google.protobuf.CodedInputStream; |
| 35 | +import com.google.protobuf.ExtensionRegistry; |
| 36 | +import com.google.protobuf.UninitializedMessageException; |
| 37 | +import java.io.IOException; |
| 38 | +import java.util.Optional; |
25 | 39 | import java.util.OptionalInt; |
26 | 40 |
|
27 | | -/** Makes instances of {@link HttpApiHostClient}. */ |
28 | | -public class HttpApiHostClientFactory { |
| 41 | +/** A client of the APIHost service over HTTP. */ |
| 42 | +abstract class HttpApiHostClient implements APIHostClientInterface { |
29 | 43 | private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); |
30 | | - private HttpApiHostClientFactory() {} |
31 | 44 |
|
32 | 45 | /** |
33 | | - * Creates a new HttpApiHostClient instance to talk to the HTTP-based API server on the given host |
34 | | - * and port. This method is called reflectively from ApiHostClientFactory. |
35 | | - * |
36 | | - * <p>The maximum number of concurrent connections can be configured by setting the {@code |
37 | | - * APPENGINE_API_MAX_CONNECTIONS} environment variable to a positive integer. If set, this |
38 | | - * value overrides the {@code maxConcurrentRpcs} parameter. |
39 | | - * |
40 | | - * @param hostAndPort The host and port of the API server. |
41 | | - * @param maxConcurrentRpcs The default maximum number of concurrent RPCs, used if the |
42 | | - * environment variable is not set. |
43 | | - * @return A new {@link APIHostClientInterface} instance. |
| 46 | + * Extra timeout that will be used for the HTTP request. If the API timeout is 5 seconds, the HTTP |
| 47 | + * request will have a timeout of 5 + {@value #DEFAULT_EXTRA_TIMEOUT_SECONDS} seconds. Usually |
| 48 | + * another timeout will happen first, either the API timeout on the server or the TimedFuture |
| 49 | + * timeout on the client, but this one enables us to clean up the HttpClient if the server is |
| 50 | + * unresponsive. |
| 51 | + */ |
| 52 | + static final double DEFAULT_EXTRA_TIMEOUT_SECONDS = 2.0; |
| 53 | + |
| 54 | + static final ImmutableMap<String, String> HEADERS = |
| 55 | + ImmutableMap.of( |
| 56 | + "X-Google-RPC-Service-Endpoint", "app-engine-apis", |
| 57 | + "X-Google-RPC-Service-Method", "/VMRemoteAPI.CallRemoteAPI"); |
| 58 | + static final String CONTENT_TYPE_VALUE = "application/octet-stream"; |
| 59 | + static final String REQUEST_ENDPOINT = "/rpc_http"; |
| 60 | + static final String DEADLINE_HEADER = "X-Google-RPC-Service-Deadline"; |
| 61 | + |
| 62 | + private static final int UNKNOWN_ERROR_CODE = 1; |
| 63 | + |
| 64 | + // TODO: study the different limits that we have for different transports and |
| 65 | + // make them more consistent, as well as sharing definitions like this one. |
| 66 | + /** The maximum size in bytes that we will allow in a request or a response payload. */ |
| 67 | + static final int MAX_PAYLOAD = 50 * 1024 * 1024; |
| 68 | + |
| 69 | + /** |
| 70 | + * Extra bytes that we allow in the HTTP content, basically to support serializing the other proto |
| 71 | + * fields besides the payload. |
44 | 72 | */ |
45 | | - public static APIHostClientInterface create( |
46 | | - HostAndPort hostAndPort, OptionalInt maxConcurrentRpcs) { |
47 | | - String url = "http://" + hostAndPort + REQUEST_ENDPOINT; |
48 | | - String maxConnectionsEnv = System.getenv("APPENGINE_API_MAX_CONNECTIONS"); |
49 | | - if (maxConnectionsEnv != null) { |
50 | | - try { |
51 | | - int maxConnections = Integer.parseInt(maxConnectionsEnv); |
52 | | - if (maxConnections > 0) { |
53 | | - maxConcurrentRpcs = OptionalInt.of(maxConnections); |
54 | | - } |
55 | | - } catch (NumberFormatException e) { |
56 | | - logger.atWarning().withCause(e).log( |
57 | | - "Failed to parse APPENGINE_API_MAX_CONNECTIONS: %s", maxConnectionsEnv); |
58 | | - } |
59 | | - } |
60 | | - Config config = Config.builder().setMaxConnectionsPerDestination(maxConcurrentRpcs).build(); |
61 | | - return HttpApiHostClient.create(url, config); |
| 73 | + static final int EXTRA_CONTENT_BYTES = 4096; |
| 74 | + |
| 75 | + @AutoValue |
| 76 | + abstract static class Config { |
| 77 | + abstract double extraTimeoutSeconds(); |
| 78 | + |
| 79 | + /** |
| 80 | + * The maximum number of concurrent connections to the API host. |
| 81 | + * |
| 82 | + * <p>This value is used to configure both the Jetty/JDK HTTP client's connection pool limit |
| 83 | + * and, when applicable, the maximum number of threads in the client's thread pool. |
| 84 | + */ |
| 85 | + abstract OptionalInt maxConnectionsPerDestination(); |
| 86 | + |
| 87 | + /** For testing that we handle missing Content-Length correctly. */ |
| 88 | + abstract boolean ignoreContentLength(); |
| 89 | + |
| 90 | + /** |
| 91 | + * Treat {@link java.nio.channels.ClosedChannelException} as indicating cancellation. We know |
| 92 | + * that this happens occasionally in a test that generates many interrupts. But we don't know if |
| 93 | + * there are other reasons for which it might arise, so for now we do not do this in production. |
| 94 | + * |
| 95 | + * <p>See <a href="http://b/70494739#comment31">this bug</a> for further background. |
| 96 | + */ |
| 97 | + abstract boolean treatClosedChannelAsCancellation(); |
| 98 | + |
| 99 | + static Builder builder() { |
| 100 | + return new AutoValue_HttpApiHostClient_Config.Builder() |
| 101 | + .setExtraTimeoutSeconds(DEFAULT_EXTRA_TIMEOUT_SECONDS) |
| 102 | + .setIgnoreContentLength(false) |
| 103 | + .setTreatClosedChannelAsCancellation(false); |
| 104 | + } |
| 105 | + |
| 106 | + abstract Builder toBuilder(); |
| 107 | + |
| 108 | + @AutoValue.Builder |
| 109 | + abstract static class Builder { |
| 110 | + abstract Builder setMaxConnectionsPerDestination(OptionalInt value); |
| 111 | + |
| 112 | + abstract Builder setExtraTimeoutSeconds(double value); |
| 113 | + |
| 114 | + abstract Builder setIgnoreContentLength(boolean value); |
| 115 | + |
| 116 | + abstract Builder setTreatClosedChannelAsCancellation(boolean value); |
| 117 | + |
| 118 | + abstract Config build(); |
| 119 | + } |
| 120 | + } |
| 121 | + |
| 122 | + private final Config config; |
| 123 | + |
| 124 | + HttpApiHostClient(Config config) { |
| 125 | + this.config = config; |
| 126 | + } |
| 127 | + |
| 128 | + Config config() { |
| 129 | + return config; |
| 130 | + } |
| 131 | + |
| 132 | + static HttpApiHostClient create(String url, Config config) { |
| 133 | + if (System.getenv("APPENGINE_API_CALLS_USING_JDK_CLIENT") != null) { |
| 134 | + logger.atInfo().log("Using JDK HTTP client for API calls"); |
| 135 | + return JdkHttpApiHostClient.create(url, config); |
| 136 | + } else { |
| 137 | + return JettyHttpApiHostClient.create(url, config); |
| 138 | + } |
| 139 | + } |
| 140 | + |
| 141 | + static class Context implements AnyRpcClientContext { |
| 142 | + private final long startTimeMillis; |
| 143 | + |
| 144 | + private int applicationError; |
| 145 | + private String errorDetail; |
| 146 | + private StatusProto status; |
| 147 | + private Throwable exception; |
| 148 | + private Optional<Long> deadlineNanos = Optional.empty(); |
| 149 | + |
| 150 | + Context() { |
| 151 | + this.startTimeMillis = System.currentTimeMillis(); |
| 152 | + } |
| 153 | + |
| 154 | + @Override |
| 155 | + public int getApplicationError() { |
| 156 | + return applicationError; |
| 157 | + } |
| 158 | + |
| 159 | + void setApplicationError(int applicationError) { |
| 160 | + this.applicationError = applicationError; |
| 161 | + } |
| 162 | + |
| 163 | + @Override |
| 164 | + public String getErrorDetail() { |
| 165 | + return errorDetail; |
| 166 | + } |
| 167 | + |
| 168 | + void setErrorDetail(String errorDetail) { |
| 169 | + this.errorDetail = errorDetail; |
| 170 | + } |
| 171 | + |
| 172 | + @Override |
| 173 | + public Throwable getException() { |
| 174 | + return exception; |
| 175 | + } |
| 176 | + |
| 177 | + void setException(Throwable exception) { |
| 178 | + this.exception = exception; |
| 179 | + } |
| 180 | + |
| 181 | + @Override |
| 182 | + public long getStartTimeMillis() { |
| 183 | + return startTimeMillis; |
| 184 | + } |
| 185 | + |
| 186 | + @Override |
| 187 | + public StatusProto getStatus() { |
| 188 | + return status; |
| 189 | + } |
| 190 | + |
| 191 | + void setStatus(StatusProto status) { |
| 192 | + this.status = status; |
| 193 | + } |
| 194 | + |
| 195 | + @Override |
| 196 | + public void setDeadline(double seconds) { |
| 197 | + Preconditions.checkArgument(seconds >= 0); |
| 198 | + double nanos = 1_000_000_000 * seconds; |
| 199 | + Preconditions.checkArgument(nanos <= Long.MAX_VALUE); |
| 200 | + this.deadlineNanos = Optional.of((long) nanos); |
| 201 | + } |
| 202 | + |
| 203 | + Optional<Long> getDeadlineNanos() { |
| 204 | + return deadlineNanos; |
| 205 | + } |
| 206 | + |
| 207 | + @Override |
| 208 | + public void startCancel() { |
| 209 | + logger.atWarning().log("Canceling HTTP API call has no effect"); |
| 210 | + } |
| 211 | + } |
| 212 | + |
| 213 | + @Override |
| 214 | + public Context newClientContext() { |
| 215 | + return new Context(); |
| 216 | + } |
| 217 | + |
| 218 | + static void communicationFailure( |
| 219 | + Context context, String errorDetail, AnyRpcCallback<APIResponse> callback, Throwable cause) { |
| 220 | + context.setApplicationError(0); |
| 221 | + context.setErrorDetail(errorDetail); |
| 222 | + context.setStatus( |
| 223 | + StatusProto.newBuilder() |
| 224 | + .setSpace("RPC") |
| 225 | + .setCode(UNKNOWN_ERROR_CODE) |
| 226 | + .setCanonicalCode(UNKNOWN_ERROR_CODE) |
| 227 | + .setMessage(errorDetail) |
| 228 | + .build()); |
| 229 | + context.setException(cause); |
| 230 | + callback.failure(); |
| 231 | + } |
| 232 | + |
| 233 | + // This represents a timeout of our HTTP request. We don't usually expect this, because we |
| 234 | + // include a timeout in the API call which the server should respect. However, this fallback |
| 235 | + // logic ensures that we will get an appropriate and timely exception if the server is very slow |
| 236 | + // to respond for some reason. |
| 237 | + // ApiProxyImpl will normally have given up before this happens, so the main purpose of the |
| 238 | + // timeout is to free up resources from the failed HTTP request. |
| 239 | + static void timeout(AnyRpcCallback<APIResponse> callback) { |
| 240 | + APIResponse apiResponse = |
| 241 | + APIResponse.newBuilder() |
| 242 | + .setError(APIResponse.ERROR.RPC_ERROR_VALUE) |
| 243 | + .setRpcError(RpcError.DEADLINE_EXCEEDED) |
| 244 | + .build(); |
| 245 | + callback.success(apiResponse); |
| 246 | + // This is "success" in the sense that we got back a response, but one that will provoke |
| 247 | + // an ApiProxy.ApiDeadlineExceededException. |
| 248 | + } |
| 249 | + |
| 250 | + static void cancelled(AnyRpcCallback<APIResponse> callback) { |
| 251 | + APIResponse apiResponse = APIResponse.newBuilder().setError(ERROR.CANCELLED_VALUE).build(); |
| 252 | + callback.success(apiResponse); |
| 253 | + // This is "success" in the sense that we got back a response, but one that will provoke |
| 254 | + // an ApiProxy.CancelledException. |
| 255 | + } |
| 256 | + |
| 257 | + @Override |
| 258 | + public void call(AnyRpcClientContext ctx, APIRequest req, AnyRpcCallback<APIResponse> cb) { |
| 259 | + Context context = (Context) ctx; |
| 260 | + ByteString payload = req.getPb(); |
| 261 | + if (payload.size() > MAX_PAYLOAD) { |
| 262 | + requestTooBig(cb); |
| 263 | + return; |
| 264 | + } |
| 265 | + RemoteApiPb.Request requestPb = |
| 266 | + RemoteApiPb.Request.newBuilder() |
| 267 | + .setServiceName(req.getApiPackage()) |
| 268 | + .setMethod(req.getCall()) |
| 269 | + .setRequest(payload) |
| 270 | + .setRequestId(req.getSecurityTicket()) |
| 271 | + .setTraceContext(req.getTraceContext().toByteString()) |
| 272 | + .build(); |
| 273 | + send(requestPb.toByteArray(), context, cb); |
| 274 | + } |
| 275 | + |
| 276 | + static void receivedResponse( |
| 277 | + byte[] responseBytes, |
| 278 | + int responseLength, |
| 279 | + Context context, |
| 280 | + AnyRpcCallback<APIResponse> callback) { |
| 281 | + logger.atFine().log("Response size %d", responseLength); |
| 282 | + CodedInputStream input = CodedInputStream.newInstance(responseBytes, 0, responseLength); |
| 283 | + RemoteApiPb.Response responsePb; |
| 284 | + try { |
| 285 | + responsePb = RemoteApiPb.Response.parseFrom(input, ExtensionRegistry.getEmptyRegistry()); |
| 286 | + } catch (UninitializedMessageException | IOException e) { |
| 287 | + String errorDetail = "Failed to parse RemoteApiPb.Response"; |
| 288 | + logger.atWarning().withCause(e).log("%s", errorDetail); |
| 289 | + communicationFailure(context, errorDetail, callback, e); |
| 290 | + return; |
| 291 | + } |
| 292 | + |
| 293 | + if (responsePb.hasApplicationError()) { |
| 294 | + RemoteApiPb.ApplicationError applicationError = responsePb.getApplicationError(); |
| 295 | + context.setApplicationError(applicationError.getCode()); |
| 296 | + context.setErrorDetail(applicationError.getDetail()); |
| 297 | + context.setStatus(StatusProto.getDefaultInstance()); |
| 298 | + callback.failure(); |
| 299 | + return; |
| 300 | + } |
| 301 | + |
| 302 | + APIResponse apiResponse = |
| 303 | + APIResponse.newBuilder() |
| 304 | + .setError(ApiProxyUtils.remoteApiErrorToApiResponseError(responsePb).getNumber()) |
| 305 | + .setPb(responsePb.getResponse()) |
| 306 | + .build(); |
| 307 | + callback.success(apiResponse); |
| 308 | + } |
| 309 | + |
| 310 | + abstract void send(byte[] requestBytes, Context context, AnyRpcCallback<APIResponse> callback); |
| 311 | + |
| 312 | + private static void requestTooBig(AnyRpcCallback<APIResponse> cb) { |
| 313 | + APIResponse apiResponse = |
| 314 | + APIResponse.newBuilder().setError(ERROR.REQUEST_TOO_LARGE_VALUE).build(); |
| 315 | + cb.success(apiResponse); |
| 316 | + // This is "success" in the sense that we got back a response, but one that will provoke |
| 317 | + // an ApiProxy.RequestTooLargeException. |
| 318 | + } |
| 319 | + |
| 320 | + static void responseTooBig(AnyRpcCallback<APIResponse> cb) { |
| 321 | + APIResponse apiResponse = |
| 322 | + APIResponse.newBuilder().setError(ERROR.RESPONSE_TOO_LARGE_VALUE).build(); |
| 323 | + cb.success(apiResponse); |
| 324 | + // This is "success" in the sense that we got back a response, but one that will provoke |
| 325 | + // an ApiProxy.ResponseTooLargeException. |
62 | 326 | } |
63 | 327 | } |
0 commit comments