Skip to content

Commit 0feeb35

Browse files
committed
chore(com): Adds Jetty experiment
1 parent c8d8d7d commit 0feeb35

6 files changed

Lines changed: 363 additions & 1 deletion

File tree

communication/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ dependencies {
2424
implementation(libs.netty.handler)
2525
implementation(libs.netty.handler.proxy)
2626
implementation(libs.apache.httpclient5)
27+
implementation(libs.jetty.httpclient)
28+
implementation(libs.jetty.unixsocket)
2729
// metrics-lib is needed rather than metrics-api to change the default port of StatsD connection manager
2830
// TODO Could help decoupling it later to only depend on metrics-api
2931
implementation(project(":products:metrics:metrics-lib"))
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
package datadog.communication.http.client.jetty;
2+
3+
import datadog.communication.http.HttpRetryPolicy;
4+
import datadog.communication.http.client.HttpClient;
5+
import datadog.communication.http.client.HttpClientRequest;
6+
import datadog.communication.http.client.HttpClientResponse;
7+
import datadog.communication.http.client.HttpTransport;
8+
import java.io.IOException;
9+
import java.net.URI;
10+
import java.util.ArrayList;
11+
import java.util.LinkedHashMap;
12+
import java.util.List;
13+
import java.util.Map;
14+
import java.util.Set;
15+
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.concurrent.ExecutionException;
17+
import java.util.concurrent.TimeUnit;
18+
import java.util.concurrent.TimeoutException;
19+
import javax.annotation.Nullable;
20+
import org.eclipse.jetty.client.HttpClientTransport;
21+
import org.eclipse.jetty.client.HttpProxy;
22+
import org.eclipse.jetty.client.api.Authentication;
23+
import org.eclipse.jetty.client.api.ContentResponse;
24+
import org.eclipse.jetty.client.api.Request;
25+
import org.eclipse.jetty.client.util.BasicAuthentication;
26+
import org.eclipse.jetty.client.util.BytesContentProvider;
27+
import org.eclipse.jetty.http.HttpField;
28+
import org.eclipse.jetty.unixsocket.client.HttpClientTransportOverUnixSockets;
29+
30+
final class JettyHttpClient implements HttpClient {
31+
private final long requestTimeoutMillis;
32+
private final HttpRetryPolicy.Factory retryPolicyFactory;
33+
private final org.eclipse.jetty.client.HttpClient client;
34+
private final boolean closeClientOnClose;
35+
private final Set<Request> inFlightRequests = ConcurrentHashMap.newKeySet();
36+
private volatile boolean closed;
37+
38+
JettyHttpClient(
39+
HttpTransport transport,
40+
@Nullable String unixDomainSocketPath,
41+
long connectTimeoutMillis,
42+
long requestTimeoutMillis,
43+
long responseTimeoutMillis,
44+
@Nullable String proxyHost,
45+
@Nullable Integer proxyPort,
46+
@Nullable String proxyUsername,
47+
@Nullable String proxyPassword,
48+
HttpRetryPolicy.Factory retryPolicyFactory,
49+
@Nullable org.eclipse.jetty.client.HttpClient externallyProvidedClient,
50+
boolean closeClientOnClose) {
51+
if (transport == HttpTransport.NAMED_PIPE) {
52+
throw new IllegalArgumentException("Jetty HTTP client supports only TCP and UDS transport");
53+
}
54+
55+
this.requestTimeoutMillis = requestTimeoutMillis;
56+
this.retryPolicyFactory = retryPolicyFactory;
57+
58+
if (externallyProvidedClient != null) {
59+
this.client = externallyProvidedClient;
60+
this.closeClientOnClose = closeClientOnClose;
61+
startClient(this.client);
62+
return;
63+
}
64+
65+
org.eclipse.jetty.client.HttpClient client = createClient(transport, unixDomainSocketPath);
66+
client.setConnectTimeout(connectTimeoutMillis);
67+
client.setIdleTimeout(responseTimeoutMillis);
68+
if (proxyHost != null && proxyPort != null) {
69+
client.getProxyConfiguration().getProxies().add(new HttpProxy(proxyHost, proxyPort));
70+
if (proxyUsername != null) {
71+
URI proxyUri = URI.create("http://" + proxyHost + ":" + proxyPort);
72+
client
73+
.getAuthenticationStore()
74+
.addAuthentication(
75+
new BasicAuthentication(
76+
proxyUri, Authentication.ANY_REALM, proxyUsername, proxyPassword));
77+
}
78+
}
79+
startClient(client);
80+
this.client = client;
81+
this.closeClientOnClose = true;
82+
}
83+
84+
@Override
85+
public HttpClientResponse execute(HttpClientRequest request) throws IOException {
86+
if (closed) {
87+
throw new IOException("http client is closed");
88+
}
89+
90+
try (HttpRetryPolicy retryPolicy = retryPolicyFactory.create()) {
91+
while (true) {
92+
try {
93+
HttpClientResponse response = executeOnce(request);
94+
if (!retryPolicy.shouldRetry(new ResponseAdapter(response))) {
95+
return response;
96+
}
97+
} catch (Exception e) {
98+
IOException io = e instanceof IOException ? (IOException) e : new IOException(e);
99+
if (!retryPolicy.shouldRetry(io)) {
100+
throw io;
101+
}
102+
}
103+
retryPolicy.backoff();
104+
}
105+
}
106+
}
107+
108+
private HttpClientResponse executeOnce(HttpClientRequest request) throws IOException {
109+
Request jettyRequest = toJettyRequest(request);
110+
inFlightRequests.add(jettyRequest);
111+
try {
112+
ContentResponse response =
113+
jettyRequest.timeout(requestTimeoutMillis, TimeUnit.MILLISECONDS).send();
114+
return toFacadeResponse(response);
115+
} catch (InterruptedException e) {
116+
Thread.currentThread().interrupt();
117+
throw new IOException("request interrupted", e);
118+
} catch (ExecutionException e) {
119+
Throwable cause = e.getCause();
120+
throw cause instanceof IOException ? (IOException) cause : new IOException(cause);
121+
} catch (TimeoutException e) {
122+
throw new IOException("request timeout", e);
123+
} finally {
124+
inFlightRequests.remove(jettyRequest);
125+
}
126+
}
127+
128+
private Request toJettyRequest(HttpClientRequest request) {
129+
Request jettyRequest = client.newRequest(request.uri()).method(request.method());
130+
for (Map.Entry<String, List<String>> header : request.headers().entrySet()) {
131+
for (String value : header.getValue()) {
132+
jettyRequest.header(header.getKey(), value);
133+
}
134+
}
135+
136+
byte[] body = request.body();
137+
if (body.length > 0) {
138+
jettyRequest.content(new BytesContentProvider(body));
139+
}
140+
return jettyRequest;
141+
}
142+
143+
private static HttpClientResponse toFacadeResponse(ContentResponse response) {
144+
Map<String, List<String>> headers = new LinkedHashMap<>();
145+
for (HttpField header : response.getHeaders()) {
146+
headers
147+
.computeIfAbsent(header.getName(), ignored -> new ArrayList<>())
148+
.add(header.getValue());
149+
}
150+
return new HttpClientResponse(response.getStatus(), headers, response.getContent());
151+
}
152+
153+
@Override
154+
public void close() {
155+
closed = true;
156+
157+
for (Request request : inFlightRequests) {
158+
request.abort(new IOException("http client is closed"));
159+
}
160+
inFlightRequests.clear();
161+
162+
if (closeClientOnClose) {
163+
try {
164+
client.stop();
165+
} catch (Exception ignored) {
166+
}
167+
}
168+
}
169+
170+
private static void startClient(org.eclipse.jetty.client.HttpClient client) {
171+
if (!client.isStarted()) {
172+
try {
173+
client.start();
174+
} catch (Exception e) {
175+
throw new IllegalStateException("Unable to start Jetty HTTP client", e);
176+
}
177+
}
178+
}
179+
180+
private static org.eclipse.jetty.client.HttpClient createClient(
181+
HttpTransport transport, @Nullable String unixDomainSocketPath) {
182+
if (transport == HttpTransport.UNIX_DOMAIN_SOCKET) {
183+
HttpClientTransport udsTransport =
184+
new HttpClientTransportOverUnixSockets(unixDomainSocketPath);
185+
return new org.eclipse.jetty.client.HttpClient(udsTransport, null);
186+
}
187+
return new org.eclipse.jetty.client.HttpClient();
188+
}
189+
190+
private static final class ResponseAdapter implements HttpRetryPolicy.Response {
191+
private final HttpClientResponse response;
192+
193+
private ResponseAdapter(HttpClientResponse response) {
194+
this.response = response;
195+
}
196+
197+
@Override
198+
public int code() {
199+
return response.statusCode();
200+
}
201+
202+
@Override
203+
public @Nullable String header(String name) {
204+
return response.header(name);
205+
}
206+
}
207+
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package datadog.communication.http.client.jetty;
2+
3+
import datadog.communication.http.HttpRetryPolicy;
4+
import datadog.communication.http.client.HttpClient;
5+
import datadog.communication.http.client.HttpClientBuilder;
6+
import datadog.communication.http.client.HttpTransport;
7+
import javax.annotation.Nullable;
8+
9+
/** Builder used to configure and create {@link JettyHttpClient}. */
10+
public final class JettyHttpClientBuilder implements HttpClientBuilder<JettyHttpClientBuilder> {
11+
private HttpTransport transport = HttpTransport.TCP;
12+
private String unixDomainSocketPath;
13+
private long connectTimeoutMillis = 10_000;
14+
private long requestTimeoutMillis = 10_000;
15+
private long responseTimeoutMillis = 10_000;
16+
private String proxyHost;
17+
private Integer proxyPort;
18+
private String proxyUsername;
19+
private String proxyPassword;
20+
private HttpRetryPolicy.Factory retryPolicyFactory = HttpRetryPolicy.Factory.NEVER_RETRY;
21+
private org.eclipse.jetty.client.HttpClient client;
22+
private boolean closeClientOnClose = true;
23+
24+
@Override
25+
public JettyHttpClientBuilder transport(HttpTransport transport) {
26+
if (transport == HttpTransport.NAMED_PIPE) {
27+
throw new IllegalArgumentException(
28+
"Jetty HTTP client does not support named pipe transport, got: " + transport);
29+
}
30+
this.transport = transport;
31+
return this;
32+
}
33+
34+
@Override
35+
public JettyHttpClientBuilder unixDomainSocketPath(@Nullable String unixDomainSocketPath) {
36+
this.unixDomainSocketPath = unixDomainSocketPath;
37+
return this;
38+
}
39+
40+
@Override
41+
public JettyHttpClientBuilder namedPipe(@Nullable String namedPipe) {
42+
if (namedPipe != null && !namedPipe.isEmpty()) {
43+
throw new UnsupportedOperationException(
44+
"Jetty HTTP client does not support named pipe transport");
45+
}
46+
return this;
47+
}
48+
49+
@Override
50+
public JettyHttpClientBuilder connectTimeoutMillis(long connectTimeoutMillis) {
51+
this.connectTimeoutMillis = connectTimeoutMillis;
52+
return this;
53+
}
54+
55+
@Override
56+
public JettyHttpClientBuilder requestTimeoutMillis(long requestTimeoutMillis) {
57+
this.requestTimeoutMillis = requestTimeoutMillis;
58+
return this;
59+
}
60+
61+
public JettyHttpClientBuilder responseTimeoutMillis(long responseTimeoutMillis) {
62+
this.responseTimeoutMillis = responseTimeoutMillis;
63+
return this;
64+
}
65+
66+
@Override
67+
public JettyHttpClientBuilder proxy(String proxyHost, int proxyPort) {
68+
return proxy(proxyHost, proxyPort, null, null);
69+
}
70+
71+
@Override
72+
public JettyHttpClientBuilder proxy(
73+
String proxyHost,
74+
int proxyPort,
75+
@Nullable String proxyUsername,
76+
@Nullable String proxyPassword) {
77+
this.proxyHost = proxyHost;
78+
this.proxyPort = proxyPort;
79+
this.proxyUsername = proxyUsername;
80+
this.proxyPassword = proxyPassword;
81+
return this;
82+
}
83+
84+
@Override
85+
public JettyHttpClientBuilder retryPolicyFactory(HttpRetryPolicy.Factory retryPolicyFactory) {
86+
this.retryPolicyFactory = retryPolicyFactory;
87+
return this;
88+
}
89+
90+
public JettyHttpClientBuilder client(
91+
org.eclipse.jetty.client.HttpClient client, boolean closeClientOnClose) {
92+
this.client = client;
93+
this.closeClientOnClose = closeClientOnClose;
94+
return this;
95+
}
96+
97+
@Override
98+
public HttpClient build() {
99+
validate();
100+
return new JettyHttpClient(
101+
transport,
102+
unixDomainSocketPath,
103+
connectTimeoutMillis,
104+
requestTimeoutMillis,
105+
responseTimeoutMillis,
106+
proxyHost,
107+
proxyPort,
108+
proxyUsername,
109+
proxyPassword,
110+
retryPolicyFactory,
111+
client,
112+
closeClientOnClose);
113+
}
114+
115+
private void validate() {
116+
if (connectTimeoutMillis <= 0 || requestTimeoutMillis <= 0 || responseTimeoutMillis <= 0) {
117+
throw new IllegalArgumentException("timeouts must be strictly positive");
118+
}
119+
if (proxyHost != null && proxyPort == null) {
120+
throw new IllegalArgumentException("proxy port must be provided when proxy host is set");
121+
}
122+
if (proxyHost != null && transport != HttpTransport.TCP) {
123+
throw new IllegalArgumentException("proxy is currently supported only for TCP transport");
124+
}
125+
if (transport == HttpTransport.UNIX_DOMAIN_SOCKET
126+
&& (unixDomainSocketPath == null || unixDomainSocketPath.isEmpty())) {
127+
throw new IllegalArgumentException(
128+
"unix domain socket path must be set for UNIX_DOMAIN_SOCKET transport");
129+
}
130+
}
131+
}

communication/src/test/java/datadog/communication/http/client/HttpClientContractTest.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import datadog.communication.http.HttpRetryPolicy;
1313
import datadog.communication.http.client.ahc.ApacheAsyncHttpClientBuilder;
14+
import datadog.communication.http.client.jetty.JettyHttpClientBuilder;
1415
import datadog.communication.http.client.netty.NettyHttpClientBuilder;
1516
import io.netty.channel.nio.NioEventLoopGroup;
1617
import java.io.Closeable;
@@ -44,6 +45,7 @@ public class HttpClientContractTest {
4445
public static final String CLIENT_IMPL_PARAMETER = "datadog.http.client.impl";
4546
public static final String NETTY = "netty";
4647
public static final String AHC = "ahc";
48+
public static final String JETTY = "jetty";
4749

4850
private final List<Closeable> closeables = new ArrayList<>();
4951
private MockWebServer server;
@@ -58,13 +60,17 @@ private String clientName() {
5860
}
5961

6062
private boolean supportsUnixDomainSocket() {
61-
return isNetty();
63+
return isNetty() || isJetty();
6264
}
6365

6466
private boolean isNetty() {
6567
return clientName().equals(NettyHttpClientBuilder.class.getSimpleName());
6668
}
6769

70+
private boolean isJetty() {
71+
return clientName().equals(JettyHttpClientBuilder.class.getSimpleName());
72+
}
73+
6874
@AfterEach
6975
void afterEach() {
7076
if (server != null) {
@@ -338,6 +344,10 @@ public void beforeEach(ExtensionContext context) {
338344
((HttpClientContractTest) context.getRequiredTestInstance())
339345
.useBuilder(new ApacheAsyncHttpClientBuilder());
340346
break;
347+
case JETTY:
348+
((HttpClientContractTest) context.getRequiredTestInstance())
349+
.useBuilder(new JettyHttpClientBuilder());
350+
break;
341351
default:
342352
throw new IllegalStateException("Unsupported implementation: " + implementationValue);
343353
}

0 commit comments

Comments
 (0)