File tree Expand file tree Collapse file tree
src/main/java/com/timgroup/statsd Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -112,11 +112,9 @@ protected ProcessingTask createProcessingTask() {
112112 @ Override
113113 protected boolean send (final Message message ) {
114114 try {
115- long threadId = Thread .currentThread ().getId ();
116- // modulo reduction alternative to: long shard = threadID % this.lockShardGrain;
117- // ref: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
118- int shard = (int )((threadId * (long )this .lockShardGrain ) >> 32 );
119- int processQueue = (int )((threadId * (long )this .workers ) >> 32 );
115+ int threadId = getThreadId ();
116+ int shard = threadId % lockShardGrain ;
117+ int processQueue = threadId % workers ;
120118
121119 if (!shutdown ) {
122120 messages [shard ].put (message );
Original file line number Diff line number Diff line change @@ -186,11 +186,9 @@ protected ProcessingTask createProcessingTask() {
186186 @ Override
187187 protected boolean send (final Message message ) {
188188 if (!shutdown ) {
189- long threadId = Thread .currentThread ().getId ();
190- // modulo reduction alternative to: long shard = threadID % [shard]this.lockShardGrain;
191- // ref: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
192- int shard = (int )((threadId * (long )this .lockShardGrain ) >> 32 );
193- int processQueue = (int )((threadId * (long )this .workers ) >> 32 );
189+ int threadId = getThreadId ();
190+ int shard = threadId % lockShardGrain ;
191+ int processQueue = threadId % workers ;
194192
195193 if (qsize [shard ].get () < qcapacity ) {
196194 messages [shard ].offer (message );
Original file line number Diff line number Diff line change @@ -27,12 +27,21 @@ public abstract class StatsDProcessor implements Runnable {
2727 protected static final String MESSAGE_TOO_LONG = "Message longer than size of sendBuffer" ;
2828 protected static final int WAIT_SLEEP_MS = 10 ; // 10 ms would be a 100HZ slice
2929
30+ // Atomic integer containing the next thread ID to be assigned
31+ private static final AtomicInteger nextId = new AtomicInteger (0 );
32+
3033 protected final StatsDClientErrorHandler handler ;
3134
3235 protected final BufferPool bufferPool ;
3336 protected final BlockingQueue <ByteBuffer > outboundQueue ; // FIFO queue with outbound buffers
3437 protected final ExecutorService executor ;
3538 protected final CountDownLatch endSignal ;
39+ protected static final ThreadLocal <Integer > threadId = new ThreadLocal <Integer >() {
40+ @ Override
41+ protected Integer initialValue () {
42+ return nextId .getAndIncrement ();
43+ }
44+ };
3645
3746 protected final int workers ;
3847 protected final int lockShardGrain ;
@@ -97,6 +106,11 @@ public BlockingQueue<ByteBuffer> getOutboundQueue() {
97106 return this .outboundQueue ;
98107 }
99108
109+ // Returns the current thread's unique ID, assigning it if necessary
110+ public static int getThreadId () {
111+ return threadId .get ().intValue ();
112+ }
113+
100114 @ Override
101115 public void run () {
102116
You can’t perform that action at this time.
0 commit comments