Skip to content

Commit 00cbb2a

Browse files
committed
AMLII-2058 Use correct buffer size for unix sockets
When the client is configured to use UDS via builder's address() method, the client defaulted to UDP-sized buffer. This patch adds a method to the ClientChannel to fetch the default packet size for each transport type, and uses it in the client to pass the correct value to the processor. The maxPacketSizeBytes field of the resolved builder is reduced to only storing the user preferred configuration, if any.
1 parent cdf99d3 commit 00cbb2a

8 files changed

Lines changed: 54 additions & 10 deletions

src/main/java/com/timgroup/statsd/ClientChannel.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,6 @@
44

55
interface ClientChannel extends WritableByteChannel {
66
String getTransportType();
7+
8+
int getMaxPacketSizeBytes();
79
}

src/main/java/com/timgroup/statsd/DatagramClientChannel.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,9 @@ public String getTransportType() {
5252
public String toString() {
5353
return "[" + getTransportType() + "] " + address;
5454
}
55+
56+
@Override
57+
public int getMaxPacketSizeBytes() {
58+
return NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES;
59+
}
5560
}

src/main/java/com/timgroup/statsd/NamedPipeClientChannel.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,9 @@ public String getTransportType() {
4848
public String toString() {
4949
return pipe;
5050
}
51+
52+
@Override
53+
public int getMaxPacketSizeBytes() {
54+
return NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES;
55+
}
5156
}

src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
174174
protected final StatsDSender statsDSender;
175175
protected StatsDSender telemetryStatsDSender;
176176
protected final Telemetry telemetry;
177-
177+
private final int maxPacketSizeBytes;
178178
private final boolean blocking;
179179

180180
/**
@@ -268,6 +268,8 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
268268
}
269269

270270
this.blocking = blocking;
271+
this.maxPacketSizeBytes = maxPacketSizeBytes;
272+
271273
{
272274
List<String> costantPreTags = new ArrayList<>();
273275
if (constantTags != null) {
@@ -300,7 +302,7 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
300302

301303
ThreadFactory threadFactory = customThreadFactory != null ? customThreadFactory : new StatsDThreadFactory();
302304

303-
statsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, poolSize,
305+
statsDProcessor = createProcessor(queueSize, handler, getPacketSize(clientChannel), poolSize,
304306
processorWorkers, blocking, aggregationFlushInterval, aggregationShards, threadFactory, containerID);
305307

306308
Properties properties = new Properties();
@@ -318,7 +320,7 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
318320
telemetryClientChannel = createByteChannel(telemetryAddressLookup, timeout, connectionTimeout, bufferSize);
319321

320322
// similar settings, but a single worker and non-blocking.
321-
telemetryStatsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes,
323+
telemetryStatsDProcessor = createProcessor(queueSize, handler, getPacketSize(telemetryClientChannel),
322324
poolSize, 1, false, 0, aggregationShards, threadFactory, containerID);
323325
}
324326

@@ -1340,4 +1342,8 @@ private String getContainerID(String containerID, boolean originDetectionEnabled
13401342

13411343
return null;
13421344
}
1345+
1346+
private int getPacketSize(ClientChannel chan) {
1347+
return maxPacketSizeBytes > 0 ? maxPacketSizeBytes : chan.getMaxPacketSizeBytes();
1348+
}
13431349
}

src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -236,14 +236,8 @@ protected NonBlockingStatsDClientBuilder resolve() {
236236
throw new UnsupportedOperationException("clone");
237237
}
238238

239-
int packetSize = maxPacketSizeBytes;
240239
Callable<SocketAddress> lookup = getAddressLookup();
241240

242-
if (packetSize == 0) {
243-
packetSize = (port == 0) ? NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES :
244-
NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES;
245-
}
246-
247241
Callable<SocketAddress> telemetryLookup = telemetryAddressLookup;
248242
if (telemetryLookup == null) {
249243
if (telemetryHostname == null) {
@@ -253,7 +247,6 @@ protected NonBlockingStatsDClientBuilder resolve() {
253247
}
254248
}
255249

256-
resolved.maxPacketSizeBytes = packetSize;
257250
resolved.addressLookup = lookup;
258251
resolved.telemetryAddressLookup = telemetryLookup;
259252

src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,9 @@ class UnixDatagramClientChannel extends DatagramClientChannel {
3131
public String getTransportType() {
3232
return "uds";
3333
}
34+
35+
@Override
36+
public int getMaxPacketSizeBytes() {
37+
return NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES;
38+
}
3439
}

src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,4 +175,9 @@ public String getTransportType() {
175175
public String toString() {
176176
return "[" + getTransportType() + "] " + address;
177177
}
178+
179+
@Override
180+
public int getMaxPacketSizeBytes() {
181+
return NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES;
182+
}
178183
}

src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,4 +171,27 @@ public void resist_dsd_timeout() throws Exception {
171171
assertThat(server.messagesReceived(), hasItem("my.prefix.mycount:30|g"));
172172
server.clear();
173173
}
174+
175+
@Test(timeout = 5000L)
176+
public void stream_uds_has_uds_buffer_size() throws Exception {
177+
final NonBlockingStatsDClient client = new NonBlockingStatsDClientBuilder()
178+
.prefix("my.prefix")
179+
.address("unixstream:///foo")
180+
.containerID("fake-container-id")
181+
.build();
182+
183+
assertEquals(client.statsDProcessor.bufferPool.getBufferSize(), NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES);
184+
}
185+
186+
@Test(timeout = 5000L)
187+
public void max_packet_size_override() throws Exception {
188+
final NonBlockingStatsDClient client = new NonBlockingStatsDClientBuilder()
189+
.prefix("my.prefix")
190+
.address("unixstream:///foo")
191+
.containerID("fake-container-id")
192+
.maxPacketSizeBytes(576)
193+
.build();
194+
195+
assertEquals(client.statsDProcessor.bufferPool.getBufferSize(), 576);
196+
}
174197
}

0 commit comments

Comments
 (0)