Skip to content

Commit f450611

Browse files
committed
bugfix: 修复http协议偶发串包问题
1 parent 91ebc00 commit f450611

6 files changed

Lines changed: 1216 additions & 65 deletions

File tree

trpc-core/src/main/java/com/tencent/trpc/core/common/config/ClientConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* Tencent is pleased to support the open source community by making tRPC available.
33
*
4-
* Copyright (C) 2023 THL A29 Limited, a Tencent company.
4+
* Copyright (C) 2023 THL A29 Limited, a Tencent company.
55
* All rights reserved.
66
*
77
* If you have downloaded a copy of the tRPC source code from Tencent,
@@ -32,7 +32,7 @@
3232
public class ClientConfig extends BaseProtocolConfig {
3333

3434
private static final Logger logger = LoggerFactory.getLogger(ClientConfig.class);
35-
35+
3636
@ConfigProperty
3737
protected String namespace;
3838

@@ -75,7 +75,7 @@ public class ClientConfig extends BaseProtocolConfig {
7575
/**
7676
* BackendConfig mapping.
7777
*/
78-
protected Map<String, BackendConfig> backendConfigMap = Maps.newHashMap();
78+
protected Map<String, BackendConfig> backendConfigMap = Maps.newConcurrentMap();
7979

8080
/**
8181
* Whether the service is registered.

trpc-core/src/test/java/com/tencent/trpc/core/common/config/ClientConfigTest.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,4 +227,54 @@ public void testClientConfigNotEmptryBackendConfig() {
227227
assertEquals(false, backendConfig.isIoThreadGroupShare());
228228
assertEquals(1000, backendConfig.getIoThreads());
229229
}
230-
}
230+
231+
@Test
232+
public void testInitAndStop() {
233+
ClientConfig config = new ClientConfig();
234+
config.init();
235+
assertTrue(config.isInitialized());
236+
config.init();
237+
assertTrue(config.isInitialized());
238+
config.stop();
239+
config.stop();
240+
}
241+
242+
@Test
243+
public void testGetBackendConfig() {
244+
ClientConfig config = new ClientConfig();
245+
BackendConfig backendConfig = new BackendConfig();
246+
backendConfig.setName("svc");
247+
backendConfig.setNamingUrl("a://b");
248+
config.addBackendConfig(backendConfig);
249+
assertEquals(backendConfig, config.getBackendConfig("svc"));
250+
}
251+
252+
@Test
253+
public void testSetterAfterInitThrows() {
254+
ClientConfig config = new ClientConfig();
255+
config.init();
256+
Exception ex = null;
257+
try {
258+
config.setNamespace("ns");
259+
} catch (Exception e) {
260+
ex = e;
261+
}
262+
assertTrue(ex instanceof IllegalArgumentException);
263+
}
264+
265+
@Test
266+
public void testSetBackendConfigMap() {
267+
ClientConfig config = new ClientConfig();
268+
java.util.Map<String, BackendConfig> map = new java.util.HashMap<>();
269+
config.setBackendConfigMap(map);
270+
assertEquals(map, config.getBackendConfigMap());
271+
}
272+
273+
@Test
274+
public void testSetDefaultIdempotent() {
275+
ClientConfig config = new ClientConfig();
276+
config.setDefault();
277+
config.setDefault();
278+
assertTrue(config.isSetDefault());
279+
}
280+
}

trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpConsumerInvoker.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,8 @@ private Response handleResponse(Request request, CloseableHttpResponse httpRespo
9393
Map<String, Object> respAttachments = new HashMap<>();
9494
for (Header header : httpResponse.getAllHeaders()) {
9595
String name = header.getName();
96-
for (HeaderElement element : header.getElements()) {
97-
String value = element.getName();
98-
respAttachments.put(name, value.getBytes(StandardCharsets.UTF_8));
99-
}
96+
String value = header.getValue();
97+
respAttachments.put(name, value.getBytes(StandardCharsets.UTF_8));
10098
}
10199

102100
Header contentLengthHdr = httpResponse.getFirstHeader(HttpHeaders.CONTENT_LENGTH);

trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/server/AbstractHttpExecutor.java

Lines changed: 88 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* Tencent is pleased to support the open source community by making tRPC available.
33
*
4-
* Copyright (C) 2023 THL A29 Limited, a Tencent company.
4+
* Copyright (C) 2023 Tencent.
55
* All rights reserved.
66
*
77
* If you have downloaded a copy of the tRPC source code from Tencent,
@@ -19,11 +19,11 @@
1919
import com.tencent.trpc.core.exception.TRpcException;
2020
import com.tencent.trpc.core.logger.Logger;
2121
import com.tencent.trpc.core.logger.LoggerFactory;
22+
import com.tencent.trpc.core.rpc.RpcContext;
2223
import com.tencent.trpc.core.rpc.CallInfo;
2324
import com.tencent.trpc.core.rpc.ProviderInvoker;
2425
import com.tencent.trpc.core.rpc.RequestMeta;
2526
import com.tencent.trpc.core.rpc.Response;
26-
import com.tencent.trpc.core.rpc.RpcContext;
2727
import com.tencent.trpc.core.rpc.RpcInvocation;
2828
import com.tencent.trpc.core.rpc.RpcServerContext;
2929
import com.tencent.trpc.core.rpc.common.RpcMethodInfo;
@@ -38,17 +38,19 @@
3838
import com.tencent.trpc.proto.http.common.RpcServerContextWithHttp;
3939
import com.tencent.trpc.proto.http.common.TrpcServletRequestWrapper;
4040
import com.tencent.trpc.proto.http.common.TrpcServletResponseWrapper;
41-
import jakarta.servlet.http.HttpServletRequest;
42-
import jakarta.servlet.http.HttpServletResponse;
4341
import java.lang.reflect.ParameterizedType;
4442
import java.lang.reflect.Type;
4543
import java.net.InetSocketAddress;
4644
import java.nio.charset.StandardCharsets;
4745
import java.util.Enumeration;
4846
import java.util.Map;
47+
import java.util.concurrent.CompletableFuture;
48+
import java.util.concurrent.TimeoutException;
49+
import java.util.concurrent.atomic.AtomicBoolean;
4950
import java.util.concurrent.CompletionStage;
50-
import java.util.concurrent.CountDownLatch;
5151
import java.util.concurrent.TimeUnit;
52+
import jakarta.servlet.http.HttpServletRequest;
53+
import jakarta.servlet.http.HttpServletResponse;
5254
import org.apache.commons.lang3.StringUtils;
5355
import org.apache.http.HttpStatus;
5456

@@ -66,34 +68,41 @@ public abstract class AbstractHttpExecutor {
6668

6769
protected void execute(HttpServletRequest request, HttpServletResponse response,
6870
RpcMethodInfoAndInvoker methodInfoAndInvoker) {
69-
71+
AtomicBoolean responded = new AtomicBoolean(false);
7072
try {
7173

7274
DefRequest rpcRequest = buildDefRequest(request, response, methodInfoAndInvoker);
7375

74-
CountDownLatch countDownLatch = new CountDownLatch(1);
76+
CompletableFuture<Void> completionFuture = new CompletableFuture<>();
7577

7678
// use a thread pool for asynchronous processing
77-
invokeRpcRequest(methodInfoAndInvoker.getInvoker(), rpcRequest, countDownLatch);
79+
invokeRpcRequest(methodInfoAndInvoker.getInvoker(), rpcRequest, completionFuture, responded);
7880

7981
// If the request carries a timeout, use this timeout to wait for the request to be processed.
8082
// If not carried, use the default timeout.
8183
long requestTimeout = rpcRequest.getMeta().getTimeout();
8284
if (requestTimeout <= 0) {
8385
requestTimeout = methodInfoAndInvoker.getInvoker().getConfig().getRequestTimeout();
8486
}
85-
if (requestTimeout > 0 && !countDownLatch.await(requestTimeout, TimeUnit.MILLISECONDS)) {
86-
throw TRpcException.newFrameException(ErrorCode.TRPC_SERVER_TIMEOUT_ERR,
87-
"wait http request execute timeout");
87+
if (requestTimeout > 0) {
88+
try {
89+
completionFuture.get(requestTimeout, TimeUnit.MILLISECONDS);
90+
} catch (TimeoutException ex) {
91+
if (responded.compareAndSet(false, true)) {
92+
doErrorReply(request, response,
93+
TRpcException.newFrameException(ErrorCode.TRPC_SERVER_TIMEOUT_ERR,
94+
"wait http request execute timeout"));
95+
}
96+
}
8897
} else {
89-
countDownLatch.await();
98+
completionFuture.get();
9099
}
91-
92100
} catch (Exception ex) {
93101
logger.error("dispatch request [{}] error", request, ex);
94-
doErrorReply(request, response, ex);
102+
if (responded.compareAndSet(false, true)) {
103+
doErrorReply(request, response, ex);
104+
}
95105
}
96-
97106
}
98107

99108
/**
@@ -108,55 +117,83 @@ protected void execute(HttpServletRequest request, HttpServletResponse response,
108117
/**
109118
* Request processing
110119
*
111-
* @param countDownLatch latch used to wait for the request processing
120+
* @param invoker the invoker
121+
* @param rpcRequest the rpc request
122+
* @param completionFuture the completion future
123+
* @param responded the responded flag
112124
*/
113-
private void invokeRpcRequest(ProviderInvoker<?> invoker, DefRequest rpcRequest, CountDownLatch countDownLatch) {
125+
private void invokeRpcRequest(ProviderInvoker<?> invoker, DefRequest rpcRequest,
126+
CompletableFuture<Void> completionFuture,
127+
AtomicBoolean responded) {
114128

115129
WorkerPool workerPool = invoker.getConfig().getWorkerPoolObj();
116130

117131
if (null == workerPool) {
118132
logger.error("dispatch rpcRequest [{}] error, workerPool is empty", rpcRequest);
119-
throw TRpcException.newFrameException(ErrorCode.TRPC_SERVER_NOSERVICE_ERR,
120-
"not found service, workerPool is empty");
133+
completionFuture.completeExceptionally(TRpcException.newFrameException(ErrorCode.TRPC_SERVER_NOSERVICE_ERR,
134+
"not found service, workerPool is empty"));
135+
return;
121136
}
122137

123138
workerPool.execute(() -> {
124-
125-
// Get the original http response
126-
HttpServletResponse response = getOriginalResponse(rpcRequest);
127-
128-
// Invoke the routing implementation method to handle the request.
129-
CompletionStage<Response> future = invoker.invoke(rpcRequest);
130-
future.whenComplete((result, t) -> {
131-
try {
132-
// Throw the call exception, which will be handled uniformly by the exception handling program.
133-
if (t != null) {
134-
throw t;
135-
}
136-
137-
// Throw a business logic exception, which will be handled uniformly
138-
// by the exception handling program.
139-
Throwable ex = result.getException();
140-
if (ex != null) {
141-
throw ex;
139+
try {
140+
// Get the original http response
141+
HttpServletResponse response = getOriginalResponse(rpcRequest);
142+
// Invoke the routing implementation method to handle the request.
143+
CompletionStage<Response> rpcFuture = invoker.invoke(rpcRequest);
144+
145+
rpcFuture.whenComplete((result, throwable) -> {
146+
try {
147+
if (responded.get()) {
148+
return;
149+
}
150+
151+
// Throw the call exception, which will be handled uniformly by the exception handling program.
152+
if (throwable != null) {
153+
throw throwable;
154+
}
155+
156+
// Throw a business logic exception, which will be handled uniformly
157+
// by the exception handling program.
158+
if (result.getException() != null) {
159+
throw result.getException();
160+
}
161+
162+
// normal response
163+
if (responded.compareAndSet(false, true)) {
164+
response.setStatus(HttpStatus.SC_OK);
165+
httpCodec.writeHttpResponse(response, result);
166+
response.flushBuffer();
167+
}
168+
169+
completionFuture.complete(null);
170+
} catch (Throwable t) {
171+
handleError(t, rpcRequest, response, responded, completionFuture);
142172
}
173+
});
143174

144-
// normal response
145-
response.setStatus(HttpStatus.SC_OK);
146-
httpCodec.writeHttpResponse(response, result);
147-
response.flushBuffer();
148-
} catch (Throwable e) {
149-
HttpServletRequest request = getOriginalRequest(rpcRequest);
150-
logger.warn("reply message error, channel: [{}], msg:[{}]", request.getRemoteAddr(), request, e);
151-
httpErrorReply(request, response,
152-
ErrorResponse.create(request, HttpStatus.SC_SERVICE_UNAVAILABLE, e));
153-
} finally {
154-
countDownLatch.countDown();
155-
}
156-
});
175+
} catch (Exception e) {
176+
handleError(e, rpcRequest, getOriginalResponse(rpcRequest), responded, completionFuture);
177+
}
157178
});
158179
}
159180

181+
/**
182+
* Handle error
183+
*/
184+
private void handleError(Throwable t, DefRequest rpcRequest, HttpServletResponse response,
185+
AtomicBoolean responded, CompletableFuture<Void> completionFuture) {
186+
try {
187+
if (responded.compareAndSet(false, true)) {
188+
HttpServletRequest request = getOriginalRequest(rpcRequest);
189+
logger.warn("reply message error, channel: [{}], msg:[{}]", request.getRemoteAddr(), request, t);
190+
httpErrorReply(request, response, ErrorResponse.create(request, HttpStatus.SC_SERVICE_UNAVAILABLE, t));
191+
}
192+
} finally {
193+
completionFuture.completeExceptionally(t);
194+
}
195+
}
196+
160197
/**
161198
* Build the context request.
162199
*
@@ -392,7 +429,6 @@ private void setRpcServerContext(HttpServletRequest request, HttpServletResponse
392429
// to maintain consistency.
393430
rpcRequest.getAttachments().put(header, value.getBytes(StandardCharsets.UTF_8));
394431
}
395-
logger.debug("request attachment: {}", JsonUtils.toJson(rpcRequest.getAttachments()));
396432
}
397433

398434
/**
@@ -488,4 +524,4 @@ private String getString(String[] callInfos, int length, int cursor) {
488524
return callInfos.length < length ? StringUtils.EMPTY : callInfos[cursor];
489525
}
490526

491-
}
527+
}

0 commit comments

Comments
 (0)