Skip to content

Commit e96e26c

Browse files
committed
Add option to enable jdk support for UDS
1 parent d936ead commit e96e26c

4 files changed

Lines changed: 23 additions & 26 deletions

File tree

src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ String tag() {
9696
public static final int SOCKET_BUFFER_BYTES = -1;
9797
public static final boolean DEFAULT_BLOCKING = false;
9898
public static final boolean DEFAULT_ENABLE_TELEMETRY = true;
99+
public static final boolean DEFAULT_ENABLE_JDK_SOCKET = true;
99100

100101
public static final boolean DEFAULT_ENABLE_AGGREGATION = true;
101102
public static final boolean DEFAULT_ENABLE_ORIGIN_DETECTION = true;
@@ -243,12 +244,7 @@ public NonBlockingStatsDClient(final NonBlockingStatsDClientBuilder builder)
243244
externalEnv = originDetectionEnabled ? Utf8.sanitize(System.getenv("DD_EXTERNAL_ENV")) : "";
244245

245246
try {
246-
clientChannel =
247-
createByteChannel(
248-
builder.addressLookup,
249-
builder.timeout,
250-
builder.connectionTimeout,
251-
builder.socketBufferSize);
247+
clientChannel = createByteChannel(addressLookup, timeout, connectionTimeout, bufferSize, enableJdkSocket);
252248

253249
ThreadFactory threadFactory =
254250
builder.threadFactory != null
@@ -291,12 +287,7 @@ public NonBlockingStatsDClient(final NonBlockingStatsDClientBuilder builder)
291287
telemetryClientChannel = clientChannel;
292288
telemetryStatsDProcessor = statsDProcessor;
293289
} else {
294-
telemetryClientChannel =
295-
createByteChannel(
296-
builder.telemetryAddressLookup,
297-
builder.timeout,
298-
builder.connectionTimeout,
299-
builder.socketBufferSize);
290+
telemetryClientChannel = createByteChannel(telemetryAddressLookup, timeout, connectionTimeout, bufferSize, enableJdkSocket);
300291

301292
// similar settings, but a single worker and non-blocking.
302293
telemetryStatsDProcessor =
@@ -479,10 +470,7 @@ StringBuilder tagString(final String[] tags, StringBuilder builder) {
479470
}
480471

481472
ClientChannel createByteChannel(
482-
Callable<SocketAddress> addressLookup,
483-
int timeout,
484-
int connectionTimeout,
485-
int bufferSize)
473+
Callable<SocketAddress> addressLookup, int timeout, int connectionTimeout, int bufferSize, boolean enableJdkSocket)
486474
throws Exception {
487475
final SocketAddress address = addressLookup.call();
488476
if (address instanceof NamedPipeSocketAddress) {
@@ -496,8 +484,7 @@ ClientChannel createByteChannel(
496484
// Allow us to support `unix://` for both kind of sockets like in go.
497485
switch (unixAddr.getTransportType()) {
498486
case UDS_STREAM:
499-
return new UnixStreamClientChannel(
500-
unixAddr.getAddress(), timeout, connectionTimeout, bufferSize);
487+
return new UnixStreamClientChannel(unixAddr.getAddress(), timeout, connectionTimeout, bufferSize, enableJdkSocket);
501488
case UDS_DATAGRAM:
502489
case UDS:
503490
return new UnixDatagramClientChannel(

src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ public class NonBlockingStatsDClientBuilder implements Cloneable {
5757

5858
public boolean enableAggregation = NonBlockingStatsDClient.DEFAULT_ENABLE_AGGREGATION;
5959

60+
/** Enable native JDK support for UDS. */
61+
public boolean enableJdkSocket = NonBlockingStatsDClient.DEFAULT_ENABLE_JDK_SOCKET;
62+
6063
/** Telemetry flush interval, in milliseconds. */
6164
public int telemetryFlushInterval = Telemetry.DEFAULT_FLUSH_INTERVAL;
6265

@@ -327,6 +330,11 @@ public NonBlockingStatsDClientBuilder originDetectionEnabled(boolean val) {
327330
return this;
328331
}
329332

333+
public NonBlockingStatsDClientBuilder enableJdkSocket(boolean val) {
334+
enableJdkSocket = val;
335+
return this;
336+
}
337+
330338
/**
331339
* Request that all metrics from this client to be enriched to specified tag cardinality.
332340
*
@@ -529,7 +537,7 @@ protected static Callable<SocketAddress> staticUnixResolution(
529537
@Override public SocketAddress call() {
530538
SocketAddress socketAddress;
531539
// Use native UDS support for compatible Java versions and jnr-unixsocket support otherwise.
532-
if (VersionUtils.isJavaVersionAtLeast(16)) {
540+
if (VersionUtils.isJavaVersionAtLeast(16) && NonBlockingStatsDClient.DEFAULT_ENABLE_JDK_SOCKET) {
533541
try {
534542
// Use reflection to avoid compiling Java 16+ classes in incompatible versions
535543
Class<?> unixDomainSocketAddressClass = Class.forName("java.net.UnixDomainSocketAddress");

src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public class UnixStreamClientChannel implements ClientChannel {
1717
private final int timeout;
1818
private final int connectionTimeout;
1919
private final int bufferSize;
20+
private final boolean enableJdkSocket;
2021

2122
private SocketChannel delegate;
2223
private final ByteBuffer delimiterBuffer =
@@ -27,16 +28,15 @@ public class UnixStreamClientChannel implements ClientChannel {
2728
*
2829
* @param address Location of named pipe
2930
*/
30-
UnixStreamClientChannel(
31-
SocketAddress address, int timeout, int connectionTimeout, int bufferSize)
32-
throws IOException {
31+
UnixStreamClientChannel(SocketAddress address, int timeout, int connectionTimeout, int bufferSize, boolean enableJdkSocket) throws IOException {
3332
this.delegate = null;
3433
this.address = address;
3534
System.out.println("========== Constructor address: " + address);
3635
System.out.println("========== Constructor address type: " + address.getClass().getName());
3736
this.timeout = timeout;
3837
this.connectionTimeout = connectionTimeout;
3938
this.bufferSize = bufferSize;
39+
this.enableJdkSocket = enableJdkSocket;
4040
}
4141

4242
@Override
@@ -133,7 +133,7 @@ private void connect() throws IOException {
133133

134134
long deadline = System.nanoTime() + connectionTimeout * 1_000_000L;
135135
// Use native UDS support for compatible Java versions and jnr-unixsocket support otherwise.
136-
if (VersionUtils.isJavaVersionAtLeast(16)) {
136+
if (VersionUtils.isJavaVersionAtLeast(16) && enableJdkSocket) {
137137
try {
138138
// Use reflection to avoid compiling Java 16+ classes in incompatible versions
139139
Class<?> protocolFamilyClass = Class.forName("java.net.StandardProtocolFamily");
@@ -180,7 +180,7 @@ private void connect() throws IOException {
180180
throw new IOException("Failed to create UnixStreamClientChannel for native UDS implementation", e);
181181
}
182182
}
183-
// Default to jnr-unixsocket if Java version is less than 16
183+
// Default to jnr-unixsocket if Java version is less than 16 or native UDS support is disabled
184184
UnixSocketChannel channel = UnixSocketChannel.create();
185185

186186
if (connectionTimeout > 0) {

src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1285,7 +1285,8 @@ ClientChannel createByteChannel(
12851285
Callable<SocketAddress> addressLookup,
12861286
int timeout,
12871287
int connectionTimeout,
1288-
int bufferSize)
1288+
int bufferSize,
1289+
boolean enableJdkSocket)
12891290
throws Exception {
12901291
return new DatagramClientChannel(addressLookup.call()) {
12911292
@Override
@@ -1336,7 +1337,8 @@ ClientChannel createByteChannel(
13361337
Callable<SocketAddress> addressLookup,
13371338
int timeout,
13381339
int connectionTimeout,
1339-
int bufferSize)
1340+
int bufferSize,
1341+
boolean enableJdkSocket)
13401342
throws Exception {
13411343
return new DatagramClientChannel(addressLookup.call()) {
13421344
@Override

0 commit comments

Comments
 (0)