-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathGrpcClientResponseImpl.java
More file actions
143 lines (129 loc) · 4.28 KB
/
GrpcClientResponseImpl.java
File metadata and controls
143 lines (129 loc) · 4.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/*
* Copyright (c) 2011-2022 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.grpc.client.impl;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.vertx.core.Expectation;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.internal.ContextInternal;
import io.vertx.grpc.client.GrpcClientRequest;
import io.vertx.grpc.client.GrpcClientResponse;
import io.vertx.grpc.client.InvalidStatusException;
import io.vertx.grpc.common.*;
import io.vertx.grpc.common.impl.GrpcReadStreamBase;
import io.vertx.grpc.common.impl.Http2GrpcMessageDeframer;
import java.nio.charset.StandardCharsets;
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public class GrpcClientResponseImpl<Req, Resp> extends GrpcReadStreamBase<GrpcClientResponseImpl<Req, Resp>, Resp> implements GrpcClientResponse<Req, Resp> {
private final GrpcClientRequestImpl<Req, Resp> request;
private final HttpClientResponse httpResponse;
private GrpcStatus status;
private String statusMessage;
public GrpcClientResponseImpl(ContextInternal context,
GrpcClientRequestImpl<Req, Resp> request,
WireFormat format,
GrpcStatus status,
HttpClientResponse httpResponse, GrpcMessageDecoder<Resp> messageDecoder) {
super(
context,
httpResponse,
httpResponse.headers().get(GrpcHeaderNames.GRPC_ENCODING),
format,
new Http2GrpcMessageDeframer(httpResponse.headers().get(GrpcHeaderNames.GRPC_ENCODING), format),
messageDecoder);
this.request = request;
this.httpResponse = httpResponse;
this.status = status;
}
@Override
public GrpcClientRequest<Req, Resp> request() {
return request;
}
@Override
public MultiMap headers() {
return httpResponse.headers();
}
@Override
public MultiMap trailers() {
return httpResponse.trailers();
}
protected void handleEnd() {
request.cancelTimeout();
if (status == null) {
String responseStatus = httpResponse.getTrailer("grpc-status");
if (responseStatus != null) {
status = GrpcStatus.valueOf(Integer.parseInt(responseStatus));
} else {
status = GrpcStatus.UNKNOWN;
}
}
super.handleEnd();
request.handleStatus(status);
if (!request.isTrailersSent()) {
request.cancel();
}
}
@Override
public GrpcStatus status() {
return status;
}
@Override
public String statusMessage() {
if (status != null && status != GrpcStatus.OK) {
String msg = httpResponse.getHeader(GrpcHeaderNames.GRPC_MESSAGE);
if (msg != null) {
statusMessage = QueryStringDecoder.decodeComponent(msg, StandardCharsets.UTF_8);
}
}
return statusMessage;
}
@Override
public Future<Void> end() {
return super.end()
.expecting(new Expectation<>() {
@Override
public boolean test(Void value) {
return status() == GrpcStatus.OK;
}
@Override
public Throwable describe(Void value) {
MultiMap metadata;
if (httpResponse.trailers().isEmpty()) { // TODO: Check if any payload has been parsed (needs GrpcReadStream modification)
metadata = httpResponse.headers(); // trailersOnly response
} else {
metadata = httpResponse.trailers();
}
return new InvalidStatusException(GrpcStatus.OK, status(), metadata);
}
});
}
@Override
public GrpcClientResponseImpl<Req, Resp> handler(Handler<Resp> handler) {
if (handler != null) {
return messageHandler(msg -> {
Resp decoded;
try {
decoded = decodeMessage(msg);
} catch (CodecException e) {
request.cancel();
return;
}
handler.handle(decoded);
});
} else {
return messageHandler(null);
}
}
}