Skip to content

Commit 95843b6

Browse files
committed
More robust implementation of a stream server
1 parent fcd5d4d commit 95843b6

4 files changed

Lines changed: 75 additions & 37 deletions

File tree

src/test/java/com/timgroup/statsd/DummyStatsDServer.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,7 @@ public void run() {
3535
((Buffer)packet).clear(); // Cast necessary to handle Java9 covariant return types
3636
// see: https://jira.mongodb.org/browse/JAVA-2559 for ref.
3737
receive(packet);
38-
packetsReceived.addAndGet(1);
39-
40-
packet.flip();
41-
for (String msg : StandardCharsets.UTF_8.decode(packet).toString().split("\n")) {
42-
addMessage(msg);
43-
}
38+
handlePacket(packet);
4439
} catch (IOException e) {
4540
}
4641
}
@@ -51,6 +46,25 @@ public void run() {
5146
thread.start();
5247
}
5348

49+
protected boolean sleepIfFrozen() {
50+
if (freeze) {
51+
try {
52+
Thread.sleep(10);
53+
} catch (InterruptedException e) {
54+
}
55+
}
56+
return freeze;
57+
}
58+
59+
protected void handlePacket(ByteBuffer packet) {
60+
packetsReceived.addAndGet(1);
61+
62+
packet.flip();
63+
for (String msg : StandardCharsets.UTF_8.decode(packet).toString().split("\n")) {
64+
addMessage(msg);
65+
}
66+
}
67+
5468
public void waitForMessage() {
5569
waitForMessage(null);
5670
}

src/test/java/com/timgroup/statsd/UnixDatagramSocketDummyStatsDServer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
package com.timgroup.statsd;
22

33
import java.io.IOException;
4+
import java.nio.Buffer;
45
import java.nio.ByteBuffer;
56
import java.nio.channels.DatagramChannel;
7+
import java.nio.charset.StandardCharsets;
68
import jnr.unixsocket.UnixDatagramChannel;
79
import jnr.unixsocket.UnixSocketAddress;
810

11+
import static com.timgroup.statsd.NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES;
12+
913
public class UnixDatagramSocketDummyStatsDServer extends DummyStatsDServer {
1014
private final DatagramChannel server;
1115

@@ -14,12 +18,12 @@ public UnixDatagramSocketDummyStatsDServer(String socketPath) throws IOException
1418
server.bind(new UnixSocketAddress(socketPath));
1519
this.listen();
1620
}
21+
1722
@Override
1823
protected boolean isOpen() {
1924
return server.isOpen();
2025
}
2126

22-
@Override
2327
protected void receive(ByteBuffer packet) throws IOException {
2428
server.receive(packet);
2529
}

src/test/java/com/timgroup/statsd/UnixStreamSocketDummyStatsDServer.java

Lines changed: 47 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.timgroup.statsd;
22

3+
import com.sun.xml.internal.ws.api.message.Packet;
34
import java.io.IOException;
45
import java.nio.Buffer;
56
import java.nio.ByteBuffer;
@@ -22,14 +23,14 @@
2223
public class UnixStreamSocketDummyStatsDServer extends DummyStatsDServer {
2324
private final UnixServerSocketChannel server;
2425
private final ConcurrentLinkedQueue<UnixSocketChannel> channels = new ConcurrentLinkedQueue<>();
26+
private final ConcurrentLinkedQueue<Packet> packets = new ConcurrentLinkedQueue<>();
2527

2628
private final Logger logger = Logger.getLogger(UnixStreamSocketDummyStatsDServer.class.getName());
2729

2830
public UnixStreamSocketDummyStatsDServer(String socketPath) throws IOException {
2931
server = UnixServerSocketChannel.open();
30-
server.configureBlocking(false);
32+
server.configureBlocking(true);
3133
server.socket().bind(new UnixSocketAddress(socketPath));
32-
this.accept();
3334
this.listen();
3435
}
3536

@@ -38,20 +39,23 @@ protected boolean isOpen() {
3839
return server.isOpen();
3940
}
4041

41-
protected void accept() {
42+
@Override
43+
protected void receive(ByteBuffer packet) throws IOException {
44+
// This is unused because we re-implement listen() to fit our needs
45+
}
46+
47+
@Override
48+
protected void listen() {
49+
logger.info("Listening on " + server.getLocalSocketAddress());
4250
Thread thread = new Thread(new Runnable() {
4351
@Override
4452
public void run() {
45-
final ByteBuffer packet = ByteBuffer.allocate(DEFAULT_UDS_MAX_PACKET_SIZE_BYTES);
46-
4753
while(isOpen()) {
48-
if (freeze) {
49-
try {
50-
Thread.sleep(10);
51-
} catch (InterruptedException e) {
52-
}
53-
} else {
54+
if (sleepIfFrozen()) {
55+
continue;
56+
}
5457
try {
58+
logger.info("Waiting for connection");
5559
UnixSocketChannel clientChannel = server.accept();
5660
if (clientChannel != null) {
5761
clientChannel.configureBlocking(true);
@@ -61,33 +65,54 @@ public void run() {
6165
logger.warning("Failed to get remote socket address");
6266
}
6367
channels.add(clientChannel);
68+
readChannel(clientChannel);
6469
}
6570
} catch (IOException e) {
6671
}
67-
}
6872
}
6973
}
7074
});
7175
thread.setDaemon(true);
7276
thread.start();
7377
}
7478

75-
@Override
76-
protected void receive(ByteBuffer packet) throws IOException {
77-
for (UnixSocketChannel channel : channels) {
78-
if (channel.isConnected()) {
79-
if (readPacket(channel, packet)) {
80-
return;
79+
public void readChannel(final UnixSocketChannel clientChannel) {
80+
logger.info("Reading from " + clientChannel);
81+
Thread thread = new Thread(new Runnable() {
82+
@Override
83+
public void run() {
84+
final ByteBuffer packet = ByteBuffer.allocate(DEFAULT_UDS_MAX_PACKET_SIZE_BYTES);
85+
86+
while(clientChannel.isOpen()) {
87+
if (sleepIfFrozen()) {
88+
continue;
89+
}
90+
((Buffer)packet).clear(); // Cast necessary to handle Java9 covariant return types
91+
// see: https://jira.mongodb.org/browse/JAVA-2559 for ref.
92+
if (readPacket(clientChannel, packet)) {
93+
handlePacket(packet);
94+
} else {
95+
try {
96+
clientChannel.close();
97+
} catch (IOException e) {
98+
logger.warning("Failed to close channel: " + e);
99+
}
100+
}
101+
81102
}
103+
logger.info("Disconnected from " + clientChannel);
82104
}
83-
}
105+
});
106+
thread.setDaemon(true);
107+
thread.start();
84108
}
85109

86-
private boolean readPacket(SocketChannel channel, ByteBuffer packet) throws IOException {
110+
private boolean readPacket(SocketChannel channel, ByteBuffer packet) {
87111
try {
88112
ByteBuffer delimiterBuffer = ByteBuffer.allocate(Integer.SIZE / Byte.SIZE).order(ByteOrder.LITTLE_ENDIAN);
89113

90114
int read = channel.read(delimiterBuffer);
115+
91116
delimiterBuffer.flip();
92117
if (read <= 0) {
93118
// There was nothing to read
@@ -100,16 +125,11 @@ private boolean readPacket(SocketChannel channel, ByteBuffer packet) throws IOEx
100125
}
101126

102127
packet.limit(packetSize);
103-
long deadline = System.nanoTime() + 1_000L * 1_000_000L;
104-
while (packet.hasRemaining()) {
105-
read = channel.read(packet);
106-
if (deadline < System.nanoTime()) {
107-
channel.close();
108-
}
128+
while (packet.hasRemaining() && channel.isConnected()) {
129+
channel.read(packet);
109130
}
110131
return true;
111132
} catch (IOException e) {
112-
channel.close();
113133
return false;
114134
}
115135
}

src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public SocketAddress call() throws Exception {
6262
.addressLookup(addressLookup)
6363
.port(0)
6464
.queueSize(1)
65-
.timeout(1) // non-zero timeout to ensure exception triggered if socket buffer full.
65+
.timeout(500) // non-zero timeout to ensure exception triggered if socket buffer full.
6666
.connectionTimeout(500)
6767
.socketBufferSize(1024 * 1024)
6868
.enableAggregation(false)
@@ -74,7 +74,7 @@ public SocketAddress call() throws Exception {
7474
.addressLookup(addressLookup)
7575
.port(0)
7676
.queueSize(1)
77-
.timeout(1) // non-zero timeout to ensure exception triggered if socket buffer full.
77+
.timeout(500) // non-zero timeout to ensure exception triggered if socket buffer full.
7878
.connectionTimeout(500)
7979
.socketBufferSize(1024 * 1024)
8080
.enableAggregation(false)
@@ -164,7 +164,7 @@ public void resist_dsd_timeout() throws Exception {
164164

165165
while (lastException.getMessage() == null) {
166166
client.gauge("mycount", 20);
167-
Thread.sleep(10); // We need to fill the buffer, setting a shorter sleep
167+
168168
}
169169
String excMessage = "Write timed out";
170170
assertThat(lastException.getMessage(), containsString(excMessage));

0 commit comments

Comments
 (0)