Skip to content

Commit f6219c8

Browse files
authored
Merge pull request #228 from iksaif/corentin.chary/unix-stream
2 parents edb30df + a99b508 commit f6219c8

15 files changed

Lines changed: 755 additions & 72 deletions

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# CHANGELOG
22

3+
## 4.3.0 / 2024.XX.XX
4+
5+
* [FEATURE] Add support for `SOCK_STREAM` Unix sockets. See [#228][]
6+
37
## 4.2.1 / 2023.03.10
48

59
* [FEATURE] Add support for `DD_DOGSTATSD_URL`. See [#217][]
@@ -232,6 +236,7 @@ Fork from [indeedeng/java-dogstatsd-client] (https://github.com/indeedeng/java-d
232236
[#203]: https://github.com/DataDog/java-dogstatsd-client/issues/203
233237
[#211]: https://github.com/DataDog/java-dogstatsd-client/issues/211
234238
[#217]: https://github.com/DataDog/java-dogstatsd-client/issues/217
239+
[#228]: https://github.com/DataDog/java-dogstatsd-client/pull/228
235240

236241
[@PatrickAuld]: https://github.com/PatrickAuld
237242
[@blevz]: https://github.com/blevz

README.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,23 @@ The client jar is distributed via Maven central, and can be downloaded [from Mav
2323

2424
### Unix Domain Socket support
2525

26-
As an alternative to UDP, Agent v6 can receive metrics via a UNIX Socket (on Linux only). This library supports transmission via this protocol. To use it, pass the socket path as a hostname, and `0` as port.
26+
As an alternative to UDP, Agent v6 can receive metrics via a UNIX Socket (on Linux only). This library supports transmission via this protocol. To use it
27+
use the `address()` method of the builder and pass the path to the socket with the `unix://` prefix:
28+
29+
```java
30+
StatsDClient client = new NonBlockingStatsDClientBuilder()
31+
.address("unix:///var/run/datadog/dsd.socket")
32+
.build();
33+
```
2734

2835
By default, all exceptions are ignored, mimicking UDP behaviour. When using Unix Sockets, transmission errors trigger exceptions you can choose to handle by passing a `StatsDClientErrorHandler`:
2936

3037
- Connection error because of an invalid/missing socket triggers a `java.io.IOException: No such file or directory`.
3138
- If DogStatsD's reception buffer were to fill up and the non blocking client is used, the send times out after 100ms and throw either a `java.io.IOException: No buffer space available` or a `java.io.IOException: Resource temporarily unavailable`.
3239

40+
The default UDS transport is using `SOCK_DATAGRAM` sockets. We also have experimental support for `SOCK_STREAM` sockets which can
41+
be enabled by using the `unixstream://` instead of `unix://`. This is not recommended for production use at this time.
42+
3343
## Configuration
3444

3545
Once your DogStatsD client is installed, instantiate it in your code:

src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import java.util.concurrent.Callable;
1919
import java.util.concurrent.ThreadFactory;
2020
import java.util.concurrent.ThreadLocalRandom;
21-
import java.util.concurrent.TimeUnit;
2221

2322

2423
/**
@@ -99,6 +98,7 @@ String tag() {
9998

10099
public static final boolean DEFAULT_ENABLE_AGGREGATION = true;
101100
public static final boolean DEFAULT_ENABLE_ORIGIN_DETECTION = true;
101+
public static final int SOCKET_CONNECT_TIMEOUT_MS = 1000;
102102

103103
public static final String CLIENT_TAG = "client:java";
104104
public static final String CLIENT_VERSION_TAG = "client_version:";
@@ -241,6 +241,9 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
241241
* The client tries to read the container ID by parsing the file /proc/self/cgroup.
242242
* This is not supported on Windows.
243243
* The client prioritizes the value passed via or entityID or DD_ENTITY_ID (if set) over the container ID.
244+
* @param connectionTimeout
245+
* the timeout in milliseconds for connecting to the StatsD server. Applies to unix sockets only.
246+
* It is also used to detect if a connection is still alive and re-establish a new one if needed.
244247
* @throws StatsDClientException
245248
* if the client could not be started
246249
*/
@@ -250,7 +253,7 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final
250253
final int maxPacketSizeBytes, String entityID, final int poolSize, final int processorWorkers,
251254
final int senderWorkers, boolean blocking, final boolean enableTelemetry, final int telemetryFlushInterval,
252255
final int aggregationFlushInterval, final int aggregationShards, final ThreadFactory customThreadFactory,
253-
String containerID, final boolean originDetectionEnabled)
256+
String containerID, final boolean originDetectionEnabled, final int connectionTimeout)
254257
throws StatsDClientException {
255258

256259
if ((prefix != null) && (!prefix.isEmpty())) {
@@ -297,7 +300,7 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final
297300
}
298301

299302
try {
300-
clientChannel = createByteChannel(addressLookup, timeout, bufferSize);
303+
clientChannel = createByteChannel(addressLookup, timeout, connectionTimeout, bufferSize);
301304

302305
ThreadFactory threadFactory = customThreadFactory != null ? customThreadFactory : new StatsDThreadFactory();
303306

@@ -316,7 +319,7 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final
316319
telemetryClientChannel = clientChannel;
317320
telemetryStatsDProcessor = statsDProcessor;
318321
} else {
319-
telemetryClientChannel = createByteChannel(telemetryAddressLookup, timeout, bufferSize);
322+
telemetryClientChannel = createByteChannel(telemetryAddressLookup, timeout, connectionTimeout, bufferSize);
320323

321324
// similar settings, but a single worker and non-blocking.
322325
telemetryStatsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes,
@@ -377,7 +380,7 @@ public NonBlockingStatsDClient(final NonBlockingStatsDClientBuilder builder) thr
377380
builder.blocking, builder.enableTelemetry, builder.telemetryFlushInterval,
378381
(builder.enableAggregation ? builder.aggregationFlushInterval : 0),
379382
builder.aggregationShards, builder.threadFactory, builder.containerID,
380-
builder.originDetectionEnabled);
383+
builder.originDetectionEnabled, builder.connectionTimeout);
381384
}
382385

383386
protected StatsDProcessor createProcessor(final int queueSize, final StatsDClientErrorHandler handler,
@@ -478,11 +481,29 @@ StringBuilder tagString(final String[] tags, StringBuilder builder) {
478481
return tagString(tags, constantTagsRendered, builder);
479482
}
480483

481-
ClientChannel createByteChannel(Callable<SocketAddress> addressLookup, int timeout, int bufferSize) throws Exception {
484+
ClientChannel createByteChannel(
485+
Callable<SocketAddress> addressLookup, int timeout, int connectionTimeout, int bufferSize)
486+
throws Exception {
482487
final SocketAddress address = addressLookup.call();
483488
if (address instanceof NamedPipeSocketAddress) {
484489
return new NamedPipeClientChannel((NamedPipeSocketAddress) address);
485490
}
491+
if (address instanceof UnixSocketAddressWithTransport) {
492+
UnixSocketAddressWithTransport unixAddr = ((UnixSocketAddressWithTransport) address);
493+
494+
// TODO: Maybe introduce a `UnixClientChannel` that can handle both stream and datagram sockets? This would
495+
// Allow us to support `unix://` for both kind of sockets like in go.
496+
switch (unixAddr.getTransportType()) {
497+
case UDS_STREAM:
498+
return new UnixStreamClientChannel(unixAddr.getAddress(), timeout, connectionTimeout, bufferSize);
499+
case UDS_DATAGRAM:
500+
case UDS:
501+
return new UnixDatagramClientChannel(unixAddr.getAddress(), timeout, bufferSize);
502+
default:
503+
throw new IllegalArgumentException("Unsupported transport type: " + unixAddr.getTransportType());
504+
}
505+
}
506+
// We keep this for backward compatibility
486507
try {
487508
if (Class.forName("jnr.unixsocket.UnixSocketAddress").isInstance(address)) {
488509
return new UnixDatagramClientChannel(address, timeout, bufferSize);

src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.timgroup.statsd;
22

3+
import jnr.constants.platform.Sock;
34
import jnr.unixsocket.UnixSocketAddress;
45

56
import java.net.InetAddress;
@@ -34,6 +35,7 @@ public class NonBlockingStatsDClientBuilder implements Cloneable {
3435
public int aggregationFlushInterval = StatsDAggregator.DEFAULT_FLUSH_INTERVAL;
3536
public int aggregationShards = StatsDAggregator.DEFAULT_SHARDS;
3637
public boolean originDetectionEnabled = NonBlockingStatsDClient.DEFAULT_ENABLE_ORIGIN_DETECTION;
38+
public int connectionTimeout = NonBlockingStatsDClient.SOCKET_CONNECT_TIMEOUT_MS;
3739

3840
public Callable<SocketAddress> addressLookup;
3941
public Callable<SocketAddress> telemetryAddressLookup;
@@ -71,6 +73,11 @@ public NonBlockingStatsDClientBuilder timeout(int val) {
7173
return this;
7274
}
7375

76+
public NonBlockingStatsDClientBuilder connectionTimeout(int val) {
77+
connectionTimeout = val;
78+
return this;
79+
}
80+
7481
public NonBlockingStatsDClientBuilder bufferPoolSize(int val) {
7582
bufferPoolSize = val;
7683
return this;
@@ -126,6 +133,16 @@ public NonBlockingStatsDClientBuilder namedPipe(String val) {
126133
return this;
127134
}
128135

136+
public NonBlockingStatsDClientBuilder address(String address) {
137+
addressLookup = getAddressLookupFromUrl(address);
138+
return this;
139+
}
140+
141+
public NonBlockingStatsDClientBuilder telemetryAddress(String address) {
142+
telemetryAddressLookup = getAddressLookupFromUrl(address);
143+
return this;
144+
}
145+
129146
public NonBlockingStatsDClientBuilder prefix(String val) {
130147
prefix = val;
131148
return this;
@@ -283,9 +300,12 @@ private Callable<SocketAddress> getAddressLookupFromUrl(String url) {
283300
return staticAddress(uriHost, uriPort);
284301
}
285302

286-
if (parsed.getScheme().equals("unix")) {
303+
if (parsed.getScheme().startsWith("unix")) {
287304
String uriPath = parsed.getPath();
288-
return staticAddress(uriPath, 0);
305+
return staticUnixResolution(
306+
uriPath,
307+
UnixSocketAddressWithTransport.TransportType.fromScheme(parsed.getScheme())
308+
);
289309
}
290310

291311
return null;
@@ -304,7 +324,10 @@ public static Callable<SocketAddress> volatileAddressResolution(final String hos
304324
if (port == 0) {
305325
return new Callable<SocketAddress>() {
306326
@Override public SocketAddress call() throws UnknownHostException {
307-
return new UnixSocketAddress(hostname);
327+
return new UnixSocketAddressWithTransport(
328+
new UnixSocketAddress(hostname),
329+
UnixSocketAddressWithTransport.TransportType.UDS
330+
);
308331
}
309332
};
310333
} else {
@@ -343,6 +366,17 @@ protected static Callable<SocketAddress> staticNamedPipeResolution(String namedP
343366
};
344367
}
345368

369+
protected static Callable<SocketAddress> staticUnixResolution(
370+
final String path,
371+
final UnixSocketAddressWithTransport.TransportType transportType) {
372+
return new Callable<SocketAddress>() {
373+
@Override public SocketAddress call() {
374+
final UnixSocketAddress socketAddress = new UnixSocketAddress(path);
375+
return new UnixSocketAddressWithTransport(socketAddress, transportType);
376+
}
377+
};
378+
}
379+
346380
private static Callable<SocketAddress> staticAddress(final String hostname, final int port) {
347381
try {
348382
return staticAddressResolution(hostname, port);
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package com.timgroup.statsd;
2+
3+
import java.net.SocketAddress;
4+
import java.util.Objects;
5+
6+
public class UnixSocketAddressWithTransport extends SocketAddress {
7+
8+
private final SocketAddress address;
9+
private final TransportType transportType;
10+
11+
public enum TransportType {
12+
UDS_STREAM("uds-stream"),
13+
UDS_DATAGRAM("uds-datagram"),
14+
UDS("uds");
15+
16+
private final String transportType;
17+
18+
TransportType(String transportType) {
19+
this.transportType = transportType;
20+
}
21+
22+
String getTransportType() {
23+
return transportType;
24+
}
25+
26+
static TransportType fromScheme(String scheme) {
27+
switch (scheme) {
28+
case "unixstream":
29+
return UDS_STREAM;
30+
case "unixgram":
31+
return UDS_DATAGRAM;
32+
case "unix":
33+
return UDS;
34+
default:
35+
break;
36+
}
37+
throw new IllegalArgumentException("Unknown scheme: " + scheme);
38+
}
39+
}
40+
41+
public UnixSocketAddressWithTransport(final SocketAddress address, final TransportType transportType) {
42+
this.address = address;
43+
this.transportType = transportType;
44+
}
45+
46+
@Override
47+
public boolean equals(Object other) {
48+
if (this == other) {
49+
return true;
50+
}
51+
if (other == null || getClass() != other.getClass()) {
52+
return false;
53+
}
54+
UnixSocketAddressWithTransport that = (UnixSocketAddressWithTransport) other;
55+
return Objects.equals(address, that.address) && transportType == that.transportType;
56+
}
57+
58+
@Override
59+
public int hashCode() {
60+
return Objects.hash(address, transportType);
61+
}
62+
63+
SocketAddress getAddress() {
64+
return address;
65+
}
66+
67+
TransportType getTransportType() {
68+
return transportType;
69+
}
70+
}

0 commit comments

Comments
 (0)