Skip to content

Commit 03816cf

Browse files
committed
feat: support long link
1 parent 2bc4705 commit 03816cf

4 files changed

Lines changed: 113 additions & 3 deletions

File tree

trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2ConsumerInvoker.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,11 @@ private Response handleResponse(Request request, SimpleHttpResponse simpleHttpRe
137137
*/
138138
private SimpleHttpResponse execute(Request request, int requestTimeout,
139139
SimpleHttpRequest simpleHttpRequest) throws Exception {
140-
CloseableHttpAsyncClient httpAsyncClient = ((Http2cRpcClient) client).getHttpAsyncClient();
140+
Http2cRpcClient http2cRpcClient = (Http2cRpcClient) client;
141+
// Refresh idle counter so the cluster manager's reconnect-check timer keeps treating
142+
// this client as healthy while it's actively serving requests.
143+
http2cRpcClient.markUsed();
144+
CloseableHttpAsyncClient httpAsyncClient = http2cRpcClient.getHttpAsyncClient();
141145
Future<SimpleHttpResponse> httpResponseFuture = httpAsyncClient.execute(simpleHttpRequest,
142146
new FutureCallback<SimpleHttpResponse>() {
143147
@Override

trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/Http2cRpcClient.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.tencent.trpc.core.rpc.AbstractRpcClient;
2020
import com.tencent.trpc.core.rpc.ConsumerInvoker;
2121
import java.io.IOException;
22+
import java.util.concurrent.TimeUnit;
2223
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
2324
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
2425

@@ -29,10 +30,23 @@ public class Http2cRpcClient extends AbstractRpcClient {
2930

3031
private static final Logger logger = LoggerFactory.getLogger(HttpRpcClient.class);
3132

33+
/**
34+
* If this client has not been used by any RPC for longer than this window, the periodic
35+
* scanner in {@code RpcClusterClientManager} will treat it as unavailable and eventually
36+
* close & evict it. The window is intentionally large so that any actively-used client is
37+
* never affected. See {@link HttpRpcClient} for the same mechanism on the HTTP/1.1 path.
38+
*/
39+
private static final long IDLE_UNAVAILABLE_THRESHOLD_NANOS = TimeUnit.MINUTES.toNanos(10);
40+
3241
/**
3342
* Asynchronous HTTP client
3443
*/
3544
protected CloseableHttpAsyncClient httpAsyncClient;
45+
/**
46+
* Timestamp (System.nanoTime()) of the most recent RPC sent through this client. Updated by
47+
* {@link Http2ConsumerInvoker} on each request.
48+
*/
49+
private volatile long lastUsedNanos = System.nanoTime();
3650

3751
public Http2cRpcClient(ProtocolConfig config) {
3852
setConfig(config);
@@ -74,6 +88,27 @@ public <T> ConsumerInvoker<T> createInvoker(ConsumerConfig<T> consumerConfig) {
7488
return new Http2ConsumerInvoker<>(this, consumerConfig, protocolConfig);
7589
}
7690

91+
/**
92+
* Record that this client just served (or is about to serve) an RPC. Called by
93+
* {@link Http2ConsumerInvoker} on every request.
94+
*/
95+
public void markUsed() {
96+
lastUsedNanos = System.nanoTime();
97+
}
98+
99+
/**
100+
* Reports the client as unavailable if it has been idle longer than
101+
* {@link #IDLE_UNAVAILABLE_THRESHOLD_NANOS}. This lets the cluster manager's periodic
102+
* reconnect-check timer eventually evict orphaned clients (e.g. after backend IP rotation).
103+
*/
104+
@Override
105+
public boolean isAvailable() {
106+
if (!super.isAvailable()) {
107+
return false;
108+
}
109+
return (System.nanoTime() - lastUsedNanos) <= IDLE_UNAVAILABLE_THRESHOLD_NANOS;
110+
}
111+
77112
public CloseableHttpAsyncClient getHttpAsyncClient() {
78113
return httpAsyncClient;
79114
}

trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpConsumerInvoker.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,11 @@ public HttpConsumerInvoker(HttpRpcClient client, ConsumerConfig<T> config,
6363
public Response send(Request request) throws Exception {
6464
HttpPost httpPost = buildRequest(request);
6565

66-
CloseableHttpClient httpClient = ((HttpRpcClient) client).getHttpClient();
66+
HttpRpcClient httpRpcClient = (HttpRpcClient) client;
67+
// Refresh idle counter so the cluster manager's reconnect-check timer keeps treating
68+
// this client as healthy while it's actively serving requests.
69+
httpRpcClient.markUsed();
70+
CloseableHttpClient httpClient = httpRpcClient.getHttpClient();
6771

6872
try (CloseableHttpResponse httpResponse = httpClient.execute(httpPost)) {
6973
return handleResponse(request, httpResponse);

trpc-proto/trpc-proto-http/src/main/java/com/tencent/trpc/proto/http/client/HttpRpcClient.java

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,54 @@
1818
import com.tencent.trpc.core.rpc.AbstractRpcClient;
1919
import com.tencent.trpc.core.rpc.ConsumerInvoker;
2020
import java.io.IOException;
21+
import java.util.concurrent.TimeUnit;
2122
import org.apache.http.impl.client.CloseableHttpClient;
2223
import org.apache.http.impl.client.HttpClients;
2324
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
2425

2526
/**
2627
* HTTP protocol client.
28+
* <p>Long-connection mode: connections are pooled by Apache {@link PoolingHttpClientConnectionManager}
29+
* and reused across requests via HTTP/1.1 keep-alive. Two safeguards are enabled by default to
30+
* keep the pool healthy in long-running processes:
31+
* <ul>
32+
* <li>{@code validateAfterInactivity}: re-check a connection's liveness before reuse if it
33+
* has been idle for a short period (avoids the classic "stale connection / NoHttpResponseException"
34+
* when the server has half-closed an idle keep-alive connection);</li>
35+
* <li>{@code evictIdleConnections}: a small background thread evicts connections that have
36+
* been idle longer than the configured limit, freeing OS file descriptors.</li>
37+
* </ul>
2738
*/
2839
public class HttpRpcClient extends AbstractRpcClient {
2940

3041
private static final Logger logger = LoggerFactory.getLogger(HttpRpcClient.class);
3142

43+
/**
44+
* Validate a pooled connection before reuse if it has been idle for at least this many
45+
* milliseconds. Cheap heuristic that catches most server-side half-closed keep-alive sockets.
46+
*/
47+
private static final int VALIDATE_AFTER_INACTIVITY_MS = 2000;
48+
/**
49+
* Evict pooled connections that have been idle for longer than this duration.
50+
*/
51+
private static final long EVICT_IDLE_CONNECTIONS_SECONDS = 60L;
52+
/**
53+
* If this client has not been used by any RPC for longer than this window, the periodic
54+
* scanner in {@code RpcClusterClientManager} will treat it as unavailable. After
55+
* a few consecutive unavailable observations the client gets closed and evicted from the
56+
* cluster cache, which is how we reclaim {@link HttpRpcClient} instances orphaned by backend
57+
* IP rotation (e.g. K8s pod IP drift). The window is intentionally large so that any
58+
* actively-used client is never affected.
59+
*/
60+
private static final long IDLE_UNAVAILABLE_THRESHOLD_NANOS =
61+
java.util.concurrent.TimeUnit.MINUTES.toNanos(10);
62+
3263
private CloseableHttpClient httpClient;
64+
/**
65+
* Timestamp (System.nanoTime()) of the most recent RPC sent through this client. Updated by
66+
* {@link HttpConsumerInvoker} on each send.
67+
*/
68+
private volatile long lastUsedNanos = System.nanoTime();
3369

3470
public HttpRpcClient(ProtocolConfig config) {
3571
setConfig(config);
@@ -44,7 +80,16 @@ protected void doOpen() {
4480
// If there is only one route, the maximum number of connections for a single route is the same
4581
// as the maximum number of connections for the entire connection pool.
4682
cm.setDefaultMaxPerRoute(maxConns);
47-
httpClient = HttpClients.custom().setConnectionManager(cm).build();
83+
// Re-validate idle pooled connections before reuse so we do not send a request through a
84+
// socket the server has already half-closed.
85+
cm.setValidateAfterInactivity(VALIDATE_AFTER_INACTIVITY_MS);
86+
httpClient = HttpClients.custom()
87+
.setConnectionManager(cm)
88+
// Background eviction of stale & long-idle connections; keeps the pool tidy in
89+
// long-running processes without affecting hot connections.
90+
.evictExpiredConnections()
91+
.evictIdleConnections(EVICT_IDLE_CONNECTIONS_SECONDS, TimeUnit.SECONDS)
92+
.build();
4893
}
4994

5095
@Override
@@ -64,6 +109,28 @@ public <T> ConsumerInvoker<T> createInvoker(ConsumerConfig<T> consumerConfig) {
64109
return new HttpConsumerInvoker<>(this, consumerConfig, protocolConfig);
65110
}
66111

112+
/**
113+
* Record that this client just served (or is about to serve) an RPC. Called by
114+
* {@link HttpConsumerInvoker} on every request.
115+
*/
116+
public void markUsed() {
117+
lastUsedNanos = System.nanoTime();
118+
}
119+
120+
/**
121+
* Reports the client as unavailable if it has been idle longer than
122+
* {@link #IDLE_UNAVAILABLE_THRESHOLD_NANOS}. This lets the cluster manager's periodic
123+
* reconnect-check timer eventually evict orphaned clients (e.g. after backend IP rotation)
124+
* even though Apache HttpClient itself has no notion of "remote permanently gone".
125+
*/
126+
@Override
127+
public boolean isAvailable() {
128+
if (!super.isAvailable()) {
129+
return false;
130+
}
131+
return (System.nanoTime() - lastUsedNanos) <= IDLE_UNAVAILABLE_THRESHOLD_NANOS;
132+
}
133+
67134
public CloseableHttpClient getHttpClient() {
68135
return httpClient;
69136
}

0 commit comments

Comments
 (0)