11package com .timgroup .statsd ;
22
3+ import java .lang .Thread ;
4+
35import java .nio .ByteBuffer ;
46
57import java .util .concurrent .ArrayBlockingQueue ;
810
911public class StatsDBlockingProcessor extends StatsDProcessor {
1012
11- private final BlockingQueue <String > messages ;
13+ private final BlockingQueue <String >[] messages ;
14+ private final BlockingQueue <Integer >[] processorWorkQueue ;
15+
16+ private class ProcessingTask implements Runnable {
17+ private final int processorQueueId ;
18+
19+ public ProcessingTask (int id ) {
20+ this .processorQueueId = id ;
21+ }
22+
23+ public void run () {
24+ boolean empty ;
25+ ByteBuffer sendBuffer ;
26+
27+ try {
28+ sendBuffer = bufferPool .borrow ();
29+ } catch (final InterruptedException e ) {
30+ handler .handle (e );
31+ return ;
32+ }
33+
34+ while (!(processorWorkQueue [this .processorQueueId ].isEmpty () && shutdown )) {
35+
36+ try {
37+
38+ if (Thread .interrupted ()) {
39+ return ;
40+ }
41+
42+ final int messageQueueIdx = processorWorkQueue [this .processorQueueId ].poll ();
43+ final String message = messages [messageQueueIdx ].poll (WAIT_SLEEP_MS , TimeUnit .MILLISECONDS );
44+ if (message != null ) {
45+ final byte [] data = message .getBytes (MESSAGE_CHARSET );
46+ if (sendBuffer .capacity () < data .length ) {
47+ throw new InvalidMessageException (MESSAGE_TOO_LONG , message );
48+ }
49+ if (sendBuffer .remaining () < (data .length + 1 )) {
50+ outboundQueue .put (sendBuffer );
51+ sendBuffer = bufferPool .borrow ();
52+ }
53+ if (sendBuffer .position () > 0 ) {
54+ sendBuffer .put ((byte ) '\n' );
55+ }
56+ sendBuffer .put (data );
57+ if (null == processorWorkQueue [this .processorQueueId ].peek ()) {
58+ outboundQueue .put (sendBuffer );
59+ sendBuffer = bufferPool .borrow ();
60+ }
61+ }
62+ } catch (final InterruptedException e ) {
63+ if (shutdown ) {
64+ endSignal .countDown ();
65+ return ;
66+ }
67+ } catch (final Exception e ) {
68+ handler .handle (e );
69+ }
70+ }
71+ endSignal .countDown ();
72+ }
73+ }
1274
1375 StatsDBlockingProcessor (final int queueSize , final StatsDClientErrorHandler handler ,
14- final int maxPacketSizeBytes , final int poolSize , final int workers )
15- throws Exception {
76+ final int maxPacketSizeBytes , final int poolSize , final int workers ,
77+ final int lockShardGrain ) throws Exception {
78+
79+ super (queueSize , handler , maxPacketSizeBytes , poolSize , workers , lockShardGrain );
80+
81+ this .messages = new ArrayBlockingQueue [lockShardGrain ];
82+ for (int i = 0 ; i < lockShardGrain ; i ++) {
83+ this .messages [i ] = new ArrayBlockingQueue <String >(queueSize );
84+ }
1685
17- super (queueSize , handler , maxPacketSizeBytes , poolSize , workers );
18- this .messages = new ArrayBlockingQueue <String >(queueSize );
86+ this .processorWorkQueue = new ArrayBlockingQueue [workers ];
87+ for (int i = 0 ; i < workers ; i ++) {
88+ this .processorWorkQueue [i ] = new ArrayBlockingQueue <Integer >(queueSize );
89+ }
1990 }
2091
2192 @ Override
2293 boolean send (final String message ) {
2394 try {
95+ long threadId = Thread .currentThread ().getId ();
96+ // modulo reduction alternative to: long shard = threadID % this.lockShardGrain;
97+ // ref: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
98+ int shard = (int )((threadId * (long )this .lockShardGrain ) >> 32 );
99+ int processQueue = (int )((threadId * (long )this .workers ) >> 32 );
100+
24101 if (!shutdown ) {
25- messages .put (message );
102+ messages [shard ].put (message );
103+ processorWorkQueue [processQueue ].put (shard );
26104 return true ;
27105 }
28106 } catch (InterruptedException e ) {
@@ -36,57 +114,7 @@ boolean send(final String message) {
36114 public void run () {
37115
38116 for (int i = 0 ; i < workers ; i ++) {
39- executor .submit (new Runnable () {
40- public void run () {
41- boolean empty ;
42- ByteBuffer sendBuffer ;
43-
44- try {
45- sendBuffer = bufferPool .borrow ();
46- } catch (final InterruptedException e ) {
47- handler .handle (e );
48- return ;
49- }
50-
51- while (!(messages .isEmpty () && shutdown )) {
52-
53- try {
54-
55- if (Thread .interrupted ()) {
56- return ;
57- }
58-
59- final String message = messages .poll (WAIT_SLEEP_MS , TimeUnit .MILLISECONDS );
60- if (message != null ) {
61- final byte [] data = message .getBytes (MESSAGE_CHARSET );
62- if (sendBuffer .capacity () < data .length ) {
63- throw new InvalidMessageException (MESSAGE_TOO_LONG , message );
64- }
65- if (sendBuffer .remaining () < (data .length + 1 )) {
66- outboundQueue .put (sendBuffer );
67- sendBuffer = bufferPool .borrow ();
68- }
69- if (sendBuffer .position () > 0 ) {
70- sendBuffer .put ((byte ) '\n' );
71- }
72- sendBuffer .put (data );
73- if (null == messages .peek ()) {
74- outboundQueue .put (sendBuffer );
75- sendBuffer = bufferPool .borrow ();
76- }
77- }
78- } catch (final InterruptedException e ) {
79- if (shutdown ) {
80- endSignal .countDown ();
81- return ;
82- }
83- } catch (final Exception e ) {
84- handler .handle (e );
85- }
86- }
87- endSignal .countDown ();
88- }
89- });
117+ executor .submit (new ProcessingTask (i ));
90118 }
91119
92120 boolean done = false ;
0 commit comments