Skip to content

Commit 4769729

Browse files
committed
Improve code
1 parent 57d781b commit 4769729

24 files changed

Lines changed: 2070 additions & 2964 deletions

httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/client/impl/AbstractWebSocketClient.java

Lines changed: 0 additions & 128 deletions
This file was deleted.

httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/client/impl/DefaultWebSocketClient.java

Lines changed: 159 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,28 +26,185 @@
2626
*/
2727
package org.apache.hc.client5.http.websocket.client.impl;
2828

29+
import java.net.URI;
30+
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.Executors;
2933
import java.util.concurrent.ThreadFactory;
34+
import java.util.concurrent.atomic.AtomicReference;
3035

36+
import org.apache.hc.client5.http.websocket.api.WebSocket;
3137
import org.apache.hc.client5.http.websocket.api.WebSocketClientConfig;
38+
import org.apache.hc.client5.http.websocket.api.WebSocketListener;
39+
import org.apache.hc.client5.http.websocket.client.CloseableWebSocketClient;
40+
import org.apache.hc.client5.http.websocket.client.impl.connector.WebSocketProtocolConnector;
41+
import org.apache.hc.client5.http.websocket.client.impl.protocol.Http1UpgradeProtocol;
42+
import org.apache.hc.client5.http.websocket.client.impl.protocol.Http2ExtendedConnectProtocol;
43+
import org.apache.hc.client5.http.websocket.client.impl.protocol.WebSocketProtocolStrategy;
3244
import org.apache.hc.core5.annotation.Contract;
3345
import org.apache.hc.core5.annotation.Internal;
3446
import org.apache.hc.core5.annotation.ThreadingBehavior;
3547
import org.apache.hc.core5.http.HttpHost;
48+
import org.apache.hc.core5.http.impl.bootstrap.AsyncRequester;
3649
import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
50+
import org.apache.hc.core5.http.protocol.HttpContext;
3751
import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequester;
52+
import org.apache.hc.core5.io.CloseMode;
3853
import org.apache.hc.core5.pool.ManagedConnPool;
54+
import org.apache.hc.core5.reactor.IOReactorStatus;
3955
import org.apache.hc.core5.reactor.IOSession;
56+
import org.apache.hc.core5.util.Args;
57+
import org.apache.hc.core5.util.TimeValue;
58+
import org.slf4j.Logger;
59+
import org.slf4j.LoggerFactory;
4060

61+
/**
62+
* Default WebSocket client implementation that manages the I/O reactor lifecycle,
63+
* connection pool, and protocol strategy selection (HTTP/1.1 Upgrade or HTTP/2
64+
* Extended CONNECT with automatic fallback).
65+
*
66+
* <p>Instances are created via
67+
* {@link org.apache.hc.client5.http.websocket.client.WebSocketClientBuilder}.</p>
68+
*/
4169
@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
4270
@Internal
43-
public class DefaultWebSocketClient extends InternalWebSocketClientBase {
71+
public class DefaultWebSocketClient extends CloseableWebSocketClient {
72+
73+
private static final Logger LOG = LoggerFactory.getLogger(DefaultWebSocketClient.class);
74+
75+
enum Status { READY, RUNNING, TERMINATED }
76+
77+
// Lifecycle
78+
private final AsyncRequester primaryRequester;
79+
private final AsyncRequester[] extraRequesters;
80+
private final ExecutorService executorService;
81+
private final AtomicReference<Status> status;
82+
83+
// Protocol
84+
private final WebSocketClientConfig defaultConfig;
85+
private final ManagedConnPool<HttpHost, IOSession> connPool;
86+
private final WebSocketProtocolStrategy h1;
87+
private final WebSocketProtocolStrategy h2;
88+
private final WebSocketProtocolConnector connector;
4489

4590
public DefaultWebSocketClient(
4691
final HttpAsyncRequester requester,
4792
final ManagedConnPool<HttpHost, IOSession> connPool,
4893
final WebSocketClientConfig defaultConfig,
4994
final ThreadFactory threadFactory,
5095
final H2MultiplexingRequester h2Requester) {
51-
super(requester, connPool, defaultConfig, threadFactory, h2Requester);
96+
super();
97+
this.primaryRequester = Args.notNull(requester, "requester");
98+
this.extraRequesters = h2Requester != null
99+
? new AsyncRequester[]{h2Requester}
100+
: new AsyncRequester[0];
101+
final int threads = Math.max(1, 1 + this.extraRequesters.length);
102+
this.executorService = Executors.newFixedThreadPool(threads, threadFactory);
103+
this.status = new AtomicReference<>(Status.READY);
104+
105+
this.connPool = Args.notNull(connPool, "connPool");
106+
this.defaultConfig = defaultConfig != null ? defaultConfig : WebSocketClientConfig.custom().build();
107+
this.h1 = newH1Protocol(requester, connPool);
108+
this.h2 = newH2Protocol(h2Requester);
109+
this.connector = h2 != null ? new WebSocketProtocolConnector(h2, h1) : null;
110+
}
111+
112+
/**
113+
* HTTP/1.1 Upgrade protocol.
114+
*/
115+
protected WebSocketProtocolStrategy newH1Protocol(
116+
final HttpAsyncRequester requester,
117+
final ManagedConnPool<HttpHost, IOSession> connPool) {
118+
return new Http1UpgradeProtocol(requester, connPool);
119+
}
120+
121+
/**
122+
* HTTP/2 Extended CONNECT protocol.
123+
*/
124+
protected WebSocketProtocolStrategy newH2Protocol(final H2MultiplexingRequester requester) {
125+
return requester != null ? new Http2ExtendedConnectProtocol(requester) : null;
126+
}
127+
128+
// ---- Lifecycle ----
129+
130+
@Override
131+
public final void start() {
132+
if (status.compareAndSet(Status.READY, Status.RUNNING)) {
133+
executorService.execute(primaryRequester::start);
134+
for (final AsyncRequester requester : extraRequesters) {
135+
executorService.execute(requester::start);
136+
}
137+
}
138+
}
139+
140+
boolean isRunning() {
141+
return status.get() == Status.RUNNING;
142+
}
143+
144+
@Override
145+
public final IOReactorStatus getStatus() {
146+
return primaryRequester.getStatus();
147+
}
148+
149+
@Override
150+
public final void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
151+
primaryRequester.awaitShutdown(waitTime);
152+
for (final AsyncRequester requester : extraRequesters) {
153+
requester.awaitShutdown(waitTime);
154+
}
155+
}
156+
157+
@Override
158+
public final void initiateShutdown() {
159+
if (LOG.isDebugEnabled()) {
160+
LOG.debug("Initiating shutdown");
161+
}
162+
primaryRequester.initiateShutdown();
163+
for (final AsyncRequester requester : extraRequesters) {
164+
requester.initiateShutdown();
165+
}
166+
}
167+
168+
@Override
169+
public final void close(final CloseMode closeMode) {
170+
if (LOG.isDebugEnabled()) {
171+
LOG.debug("Shutdown {}", closeMode);
172+
}
173+
primaryRequester.initiateShutdown();
174+
primaryRequester.close(closeMode != null ? closeMode : CloseMode.IMMEDIATE);
175+
for (final AsyncRequester requester : extraRequesters) {
176+
requester.initiateShutdown();
177+
requester.close(closeMode != null ? closeMode : CloseMode.IMMEDIATE);
178+
}
179+
executorService.shutdownNow();
180+
try {
181+
final CloseMode mode = closeMode != null ? closeMode : CloseMode.GRACEFUL;
182+
connPool.close(mode);
183+
} catch (final Exception ex) {
184+
if (LOG.isWarnEnabled()) {
185+
LOG.warn("Error closing pool: {}", ex.getMessage(), ex);
186+
}
187+
}
188+
}
189+
190+
@Override
191+
public void close() {
192+
close(CloseMode.GRACEFUL);
193+
}
194+
195+
// ---- Protocol ----
196+
197+
@Override
198+
protected CompletableFuture<WebSocket> doConnect(
199+
final URI uri,
200+
final WebSocketListener listener,
201+
final WebSocketClientConfig cfgOrNull,
202+
final HttpContext context) {
203+
204+
final WebSocketClientConfig cfg = cfgOrNull != null ? cfgOrNull : defaultConfig;
205+
if (cfg.isHttp2Enabled() && connector != null) {
206+
return connector.connect(uri, listener, cfg, context);
207+
}
208+
return h1.connect(uri, listener, cfg, context);
52209
}
53210
}

0 commit comments

Comments
 (0)