File tree Expand file tree Collapse file tree
src/main/java/com/timgroup/statsd Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -129,11 +129,9 @@ protected ProcessingTask createProcessingTask() {
129129 @ Override
130130 protected boolean send (final Message message ) {
131131 try {
132- long threadId = Thread .currentThread ().getId ();
133- // modulo reduction alternative to: long shard = threadID % this.lockShardGrain;
134- // ref: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
135- int shard = (int )((threadId * (long )this .lockShardGrain ) >> 32 );
136- int processQueue = (int )((threadId * (long )this .workers ) >> 32 );
132+ int threadId = getThreadId ();
133+ int shard = threadId % lockShardGrain ;
134+ int processQueue = threadId % workers ;
137135
138136 if (!shutdown ) {
139137 messages [shard ].put (message );
Original file line number Diff line number Diff line change @@ -157,11 +157,9 @@ protected ProcessingTask createProcessingTask() {
157157 @ Override
158158 protected boolean send (final Message message ) {
159159 if (!shutdown ) {
160- long threadId = Thread .currentThread ().getId ();
161- // modulo reduction alternative to: long shard = threadID % [shard]this.lockShardGrain;
162- // ref: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
163- int shard = (int )((threadId * (long )this .lockShardGrain ) >> 32 );
164- int processQueue = (int )((threadId * (long )this .workers ) >> 32 );
160+ int threadId = getThreadId ();
161+ int shard = threadId % lockShardGrain ;
162+ int processQueue = threadId % workers ;
165163
166164 if (qsize [shard ].get () < qcapacity ) {
167165 messages [shard ].offer (message );
Original file line number Diff line number Diff line change @@ -26,13 +26,22 @@ public abstract class StatsDProcessor implements Runnable {
2626 protected static final String MESSAGE_TOO_LONG = "Message longer than size of sendBuffer" ;
2727 protected static final int WAIT_SLEEP_MS = 10 ; // 10 ms would be a 100HZ slice
2828
29+ // Atomic integer containing the next thread ID to be assigned
30+ private static final AtomicInteger nextId = new AtomicInteger (0 );
31+
2932 protected final StatsDClientErrorHandler handler ;
3033
3134 protected final BufferPool bufferPool ;
3235 protected final Queue <Message > highPrioMessages ; // FIFO queue for high priority messages
3336 protected final BlockingQueue <ByteBuffer > outboundQueue ; // FIFO queue with outbound buffers
3437 protected final ExecutorService executor ;
3538 protected final CountDownLatch endSignal ;
39+ protected static final ThreadLocal <Integer > threadId = new ThreadLocal <Integer >() {
40+ @ Override
41+ protected Integer initialValue () {
42+ return nextId .getAndIncrement ();
43+ }
44+ };
3645
3746 protected final int workers ;
3847 protected final int qcapacity ;
@@ -154,6 +163,11 @@ public int getQcapacity() {
154163 return this .qcapacity ;
155164 }
156165
166+ // Returns the current thread's unique ID, assigning it if necessary
167+ public static int getThreadId () {
168+ return threadId .get ().intValue ();
169+ }
170+
157171 @ Override
158172 public void run () {
159173
You can’t perform that action at this time.
0 commit comments