1111
1212public class StatsDNonBlockingProcessor extends StatsDProcessor {
1313
14- private final Queue <Message > messages ;
14+ private final Queue <Message >[] messages ;
15+ private final Queue <Integer >[] processorWorkQueue ;
16+
1517 private final int qcapacity ;
16- private final AtomicInteger qsize ; // qSize will not reflect actual size, but a close estimate.
18+ private final AtomicInteger [] qsize ; // qSize will not reflect actual size, but a close estimate.
19+
20+ private class ProcessingTask implements Runnable {
21+ private final int processorQueueId ;
22+
23+ public ProcessingTask (int id ) {
24+ this .processorQueueId = id ;
25+ }
26+
27+ public void run () {
28+ boolean empty ;
29+ ByteBuffer sendBuffer ;
30+
31+ try {
32+ sendBuffer = bufferPool .borrow ();
33+ } catch (final InterruptedException e ) {
34+ handler .handle (e );
35+ return ;
36+ }
37+
38+ while (!((empty = processorWorkQueue [this .processorQueueId ].isEmpty ()) && shutdown )) {
39+
40+ try {
41+ if (empty ) {
42+ Thread .sleep (WAIT_SLEEP_MS );
43+ continue ;
44+ }
45+
46+ if (Thread .interrupted ()) {
47+ return ;
48+ }
49+
50+ final int messageQueueIdx = processorWorkQueue [this .processorQueueId ].poll ();
51+ final String message = messages [messageQueueIdx ].poll ();
52+ if (message != null ) {
53+ qsize [messageQueueIdx ].decrementAndGet ();
54+ final byte [] data = message .getBytes (MESSAGE_CHARSET );
55+ if (sendBuffer .capacity () < data .length ) {
56+ throw new InvalidMessageException (MESSAGE_TOO_LONG , message );
57+ }
58+ if (sendBuffer .remaining () < (data .length + 1 )) {
59+ outboundQueue .put (sendBuffer );
60+ sendBuffer = bufferPool .borrow ();
61+ }
62+ if (sendBuffer .position () > 0 ) {
63+ sendBuffer .put ((byte ) '\n' );
64+ }
65+ sendBuffer .put (data );
66+ if (null == processorWorkQueue [this .processorQueueId ].peek ()) {
67+ outboundQueue .put (sendBuffer );
68+ sendBuffer = bufferPool .borrow ();
69+ }
70+ }
71+ } catch (final InterruptedException e ) {
72+ if (shutdown ) {
73+ endSignal .countDown ();
74+ return ;
75+ }
76+ } catch (final Exception e ) {
77+ handler .handle (e );
78+ }
79+ }
80+ endSignal .countDown ();
81+ }
82+ }
1783
1884 private class ProcessingTask extends StatsDProcessor .ProcessingTask {
1985
@@ -93,13 +159,23 @@ public void run() {
93159 }
94160
95161 StatsDNonBlockingProcessor (final int queueSize , final StatsDClientErrorHandler handler ,
96- final int maxPacketSizeBytes , final int poolSize , final int workers )
97- throws Exception {
162+ final int maxPacketSizeBytes , final int poolSize , final int workers ,
163+ final int lockShardGrain ) throws Exception {
98164
99- super (queueSize , handler , maxPacketSizeBytes , poolSize , workers );
100- this .qsize = new AtomicInteger ( 0 ) ;
165+ super (queueSize , handler , maxPacketSizeBytes , poolSize , workers , lockShardGrain );
166+ this .qsize = new AtomicInteger [ lockShardGrain ] ;
101167 this .qcapacity = queueSize ;
102- this .messages = new ConcurrentLinkedQueue <>();
168+ this .messages = new ConcurrentLinkedQueue [lockShardGrain ];
169+ for (int i = 0 ; i < lockShardGrain ; i ++) {
170+ this .qsize [i ] = new AtomicInteger ();
171+ this .messages [i ] = new ConcurrentLinkedQueue <Message >();
172+ this .qsize [i ].set (0 );
173+ }
174+
175+ this .processorWorkQueue = new ConcurrentLinkedQueue [workers ];
176+ for (int i = 0 ; i < workers ; i ++) {
177+ this .processorWorkQueue [i ] = new ConcurrentLinkedQueue <Integer >();
178+ }
103179 }
104180
105181 @ Override
@@ -110,9 +186,16 @@ protected ProcessingTask createProcessingTask() {
110186 @ Override
111187 protected boolean send (final Message message ) {
112188 if (!shutdown ) {
113- if (qsize .get () < qcapacity ) {
114- messages .offer (message );
115- qsize .incrementAndGet ();
189+ long threadId = Thread .currentThread ().getId ();
190+ // modulo reduction alternative to: long shard = threadID % [shard]this.lockShardGrain;
191+ // ref: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
192+ int shard = (int )((threadId * (long )this .lockShardGrain ) >> 32 );
193+ int processQueue = (int )((threadId * (long )this .workers ) >> 32 );
194+
195+ if (qsize [shard ].get () < qcapacity ) {
196+ messages [shard ].offer (message );
197+ qsize [shard ].incrementAndGet ();
198+ processorWorkQueue [processQueue ].offer (shard );
116199 return true ;
117200 }
118201 }
0 commit comments