Skip to content

Commit 03043d6

Browse files
committed
Add Unix domain socket support
This change adds Unix domain socket support. The sync client uses JUnixSocket, which provides synchronous UDS support through the legacy `java.net.Socket` API; the async client uses the JEP 380 implementation of UDS through the `java.nio.channels.SocketChannel` API (requires JDK16 or later). Since the synchronous client is tightly coupled to the `Socket` API, we can't trivially use JEP 380 UDS support. We would first have to write an adapter to implement the `Socket` API, backed by a JEP 380 UDS `SocketChannel`. This would require us to implement features like socket option configuration, connection timeouts, and socket timeouts; we would also have to implement APIs like `getInetAddress()` which don't actually make sense in a UDS context. This is probably doable (JUnixSocket does it, albeit with a different implementation strategy based on native code), but it's not trivial. The asynchronous client is the other way around: it supports JEP 380, but not JUnixSocket. The issue here is more subtle: JDK and JUnixDomain channels cannot be mixed in the same selector, and since JUnixDomain does not provide an implementation of TCP/IP channels, supporting JUnixSocket in the async client would require substantial rework in the IO reactor. Since JDK8 is end-of-life next year, I doubt this is worth doing unless we can find some clever way of integrating the new channel type with minimal churn. Unix domain socket support is exposed through the `RequestConfig` API. A path to a Unix domain socket can be provided as a client-wide default through `setDefaultRequestConfig`, or on a per-request basis through `setConfig`. Currently, proxies and TLS are not supported through UDS. The former feature seems unnecessary, but the latter is likely worth adding at some point, since contacting an HTTPS endpoint over UDS (sometimes denoted by the URI scheme `https+unix`) is not unheard of.
1 parent c5f98fb commit 03043d6

25 files changed

Lines changed: 1010 additions & 55 deletions

httpclient5-testing/pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@
7777
<artifactId>httpclient5-fluent</artifactId>
7878
<scope>test</scope>
7979
</dependency>
80+
<dependency>
81+
<groupId>com.kohlschutter.junixsocket</groupId>
82+
<artifactId>junixsocket-core</artifactId>
83+
<scope>test</scope>
84+
<type>pom</type>
85+
</dependency>
8086
<dependency>
8187
<groupId>org.junit.jupiter</groupId>
8288
<artifactId>junit-jupiter-params</artifactId>
@@ -153,4 +159,4 @@
153159
</plugins>
154160
</reporting>
155161

156-
</project>
162+
</project>

httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/StandardTestClientBuilder.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
package org.apache.hc.client5.testing.extension.sync;
2929

30+
import java.nio.file.Path;
3031
import java.util.Collection;
3132

3233
import org.apache.hc.client5.http.AuthenticationStrategy;
@@ -36,6 +37,7 @@
3637
import org.apache.hc.client5.http.auth.CredentialsProvider;
3738
import org.apache.hc.client5.http.classic.ExecChainHandler;
3839
import org.apache.hc.client5.http.config.ConnectionConfig;
40+
import org.apache.hc.client5.http.config.RequestConfig;
3941
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
4042
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
4143
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
@@ -157,6 +159,14 @@ public TestClientBuilder setDefaultCredentialsProvider(final CredentialsProvider
157159
return this;
158160
}
159161

162+
@Override
163+
public TestClientBuilder setUnixDomainSocket(final Path unixDomainSocket) {
164+
this.clientBuilder.setDefaultRequestConfig(RequestConfig.custom()
165+
.setUnixDomainSocket(unixDomainSocket)
166+
.build());
167+
return this;
168+
}
169+
160170
@Override
161171
public TestClient build() throws Exception {
162172
final HttpClientConnectionManager connectionManagerCopy = connectionManager != null ? connectionManager :

httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/TestClientBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
package org.apache.hc.client5.testing.extension.sync;
2929

30+
import java.nio.file.Path;
3031
import java.util.Collection;
3132

3233
import org.apache.hc.client5.http.AuthenticationStrategy;
@@ -103,6 +104,10 @@ default TestClientBuilder setDefaultCredentialsProvider(CredentialsProvider cred
103104
throw new UnsupportedOperationException("Operation not supported by " + getProtocolLevel());
104105
}
105106

107+
default TestClientBuilder setUnixDomainSocket(Path unixDomainSocket) {
108+
throw new UnsupportedOperationException("Operation not supported by " + getProtocolLevel());
109+
}
110+
106111
TestClient build() throws Exception;
107112

108113
}

httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/TestClientResources.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class TestClientResources implements AfterEachCallback {
5050

5151
private TestServer server;
5252
private TestClient client;
53+
private UnixDomainProxyServer udsProxy;
5354

5455
public TestClientResources(final URIScheme scheme, final ClientProtocolLevel clientProtocolLevel, final Timeout timeout) {
5556
this.scheme = scheme != null ? scheme : URIScheme.HTTP;
@@ -74,6 +75,9 @@ public void afterEach(final ExtensionContext extensionContext) {
7475
if (client != null) {
7576
client.close(CloseMode.GRACEFUL);
7677
}
78+
if (udsProxy != null) {
79+
udsProxy.close();
80+
}
7781
if (server != null) {
7882
server.shutdown(CloseMode.IMMEDIATE);
7983
}
@@ -99,6 +103,15 @@ public TestServer server() throws Exception {
99103
return server;
100104
}
101105

106+
public UnixDomainProxyServer udsProxy() throws Exception {
107+
if (udsProxy == null) {
108+
final TestServer testServer = server();
109+
final int port = testServer.getServerAddress().getPort();
110+
udsProxy = new UnixDomainProxyServer(port);
111+
}
112+
return udsProxy;
113+
}
114+
102115
public void configureClient(final Consumer<TestClientBuilder> clientCustomizer) {
103116
Asserts.check(client == null, "Client is already running and cannot be changed");
104117
clientCustomizer.accept(clientBuilder);

httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/sync/TestServer.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class TestServer {
4343
private final Http1Config http1Config;
4444
private final HttpProcessor httpProcessor;
4545
private final Decorator<HttpServerRequestHandler> exchangeHandlerDecorator;
46+
private volatile InetSocketAddress serverAddress;
4647

4748
TestServer(
4849
final ClassicTestServer server,
@@ -64,7 +65,14 @@ public InetSocketAddress start() throws IOException {
6465
server.configure(exchangeHandlerDecorator);
6566
server.configure(httpProcessor);
6667
server.start();
67-
return new InetSocketAddress(server.getInetAddress(), server.getPort());
68+
serverAddress = new InetSocketAddress(server.getInetAddress(), server.getPort());
69+
return serverAddress;
6870
}
6971

72+
public InetSocketAddress getServerAddress() {
73+
if (serverAddress == null) {
74+
throw new IllegalStateException("Server has not been started");
75+
}
76+
return serverAddress;
77+
}
7078
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
28+
package org.apache.hc.client5.testing.extension.sync;
29+
30+
import org.newsclub.net.unix.AFUNIXServerSocket;
31+
import org.newsclub.net.unix.AFUNIXSocket;
32+
33+
import java.io.File;
34+
import java.io.IOException;
35+
import java.io.InputStream;
36+
import java.io.OutputStream;
37+
import java.net.Socket;
38+
import java.nio.file.Files;
39+
import java.nio.file.Path;
40+
import java.util.concurrent.CompletableFuture;
41+
import java.util.concurrent.CountDownLatch;
42+
import java.util.concurrent.ExecutorService;
43+
import java.util.concurrent.Executors;
44+
import java.util.concurrent.TimeUnit;
45+
46+
import static java.util.concurrent.CompletableFuture.supplyAsync;
47+
48+
public final class UnixDomainProxyServer {
49+
private final int port;
50+
private final ExecutorService executorService;
51+
private final Path socketPath;
52+
private final CountDownLatch serverReady = new CountDownLatch(1);
53+
private volatile AFUNIXServerSocket serverSocket;
54+
55+
public UnixDomainProxyServer(final int port) {
56+
this.port = port;
57+
this.executorService = Executors.newCachedThreadPool();
58+
this.socketPath = (new File("proxy.sock")).toPath();
59+
}
60+
61+
public void start() {
62+
executorService.submit(this::runUdsProxy);
63+
try {
64+
serverReady.await();
65+
} catch (final InterruptedException ex) {
66+
throw new RuntimeException(ex);
67+
}
68+
}
69+
70+
public Path getSocketPath() {
71+
return socketPath;
72+
}
73+
74+
public void close() {
75+
try {
76+
serverSocket.close();
77+
executorService.shutdownNow();
78+
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
79+
throw new RuntimeException("Failed to shut down");
80+
}
81+
Files.deleteIfExists(socketPath);
82+
} catch (final InterruptedException | IOException ex) {
83+
throw new RuntimeException(ex);
84+
}
85+
}
86+
87+
private void runUdsProxy() {
88+
try {
89+
Files.deleteIfExists(socketPath);
90+
} catch (final IOException ignore) {
91+
}
92+
93+
try {
94+
try (final AFUNIXServerSocket server = AFUNIXServerSocket.bindOn(socketPath, true)) {
95+
this.serverSocket = server;
96+
serverReady.countDown();
97+
serveRequests(server);
98+
} catch (final Throwable ignore) {
99+
}
100+
} catch (final Throwable t) {
101+
serverReady.countDown();
102+
throw t;
103+
}
104+
}
105+
106+
private void serveRequests(final AFUNIXServerSocket server) throws IOException {
107+
while (true) {
108+
final AFUNIXSocket udsClient = server.accept();
109+
final Socket tcpSocket = new Socket("localhost", port);
110+
final CompletableFuture<Void> f1 = supplyAsync(() -> pipe(udsClient, tcpSocket), executorService);
111+
final CompletableFuture<Void> f2 = supplyAsync(() -> pipe(tcpSocket, udsClient), executorService);
112+
CompletableFuture.allOf(f1, f2).whenComplete((result, ex) -> {
113+
try {
114+
udsClient.close();
115+
tcpSocket.close();
116+
} catch (final IOException ignore) {
117+
}
118+
});
119+
}
120+
}
121+
122+
private Void pipe(final Socket inputSocket, final Socket outputSocket) {
123+
try (
124+
final InputStream in = inputSocket.getInputStream();
125+
final OutputStream out = outputSocket.getOutputStream()
126+
) {
127+
final byte[] buf = new byte[8192];
128+
int len;
129+
while ((len = in.read(buf)) != -1) {
130+
out.write(buf, 0, len);
131+
out.flush();
132+
}
133+
} catch (final IOException ignore) {
134+
}
135+
return null;
136+
}
137+
}

httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/AbstractIntegrationTestBase.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
package org.apache.hc.client5.testing.sync;
2929

3030
import java.net.InetSocketAddress;
31+
import java.nio.file.Path;
3132
import java.util.function.Consumer;
3233

3334
import org.apache.hc.client5.testing.extension.sync.ClientProtocolLevel;
@@ -47,9 +48,16 @@ abstract class AbstractIntegrationTestBase {
4748

4849
@RegisterExtension
4950
private final TestClientResources testResources;
51+
private final boolean useUnixDomainSocket;
5052

5153
protected AbstractIntegrationTestBase(final URIScheme scheme, final ClientProtocolLevel clientProtocolLevel) {
54+
this(scheme, clientProtocolLevel, false);
55+
}
56+
57+
protected AbstractIntegrationTestBase(
58+
final URIScheme scheme, final ClientProtocolLevel clientProtocolLevel, final boolean useUnixDomainSocket) {
5259
this.testResources = new TestClientResources(scheme, clientProtocolLevel, TIMEOUT);
60+
this.useUnixDomainSocket = useUnixDomainSocket;
5361
}
5462

5563
public URIScheme scheme() {
@@ -67,6 +75,9 @@ public void configureServer(final Consumer<TestServerBootstrap> serverCustomizer
6775
public HttpHost startServer() throws Exception {
6876
final TestServer server = testResources.server();
6977
final InetSocketAddress inetSocketAddress = server.start();
78+
if (useUnixDomainSocket) {
79+
testResources.udsProxy().start();
80+
}
7081
return new HttpHost(testResources.scheme().id, "localhost", inetSocketAddress.getPort());
7182
}
7283

@@ -75,6 +86,12 @@ public void configureClient(final Consumer<TestClientBuilder> clientCustomizer)
7586
}
7687

7788
public TestClient client() throws Exception {
89+
if (useUnixDomainSocket) {
90+
final Path socketPath = testResources.udsProxy().getSocketPath();
91+
testResources.configureClient(builder -> {
92+
builder.setUnixDomainSocket(socketPath);
93+
});
94+
}
7895
return testResources.client();
7996
}
8097

0 commit comments

Comments
 (0)