Skip to content

Commit 447cd64

Browse files
committed
[ci skip] add ThreadPool
1 parent 673899f commit 447cd64

3 files changed

Lines changed: 371 additions & 124 deletions

File tree

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.dreeam.leaf.async;
2+
3+
@org.jspecify.annotations.NullMarked
4+
public final class AsyncDispatcher {
5+
6+
public static final ThreadPool INSTANCE;
7+
8+
static {
9+
final String threadsProperty = System.getProperty("leaf.scheduler.threads");
10+
int numThreads = Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 1, 4);
11+
if (threadsProperty != null) {
12+
try {
13+
int i = Integer.parseInt(threadsProperty);
14+
if (i >= 1) {
15+
numThreads = i;
16+
}
17+
} catch (NumberFormatException ignored) {
18+
}
19+
}
20+
final String queueProperty = System.getProperty("leaf.scheduler.queue-size");
21+
int queue = 8192;
22+
if (queueProperty != null) {
23+
try {
24+
int j = Integer.parseInt(queueProperty);
25+
if (j >= 1) queue = j;
26+
} catch (NumberFormatException ignored) {
27+
}
28+
}
29+
INSTANCE = new ThreadPool(numThreads,
30+
queue,
31+
"Leaf Async Scheduler",
32+
Thread.NORM_PRIORITY - 1);
33+
}
34+
35+
private AsyncDispatcher() {
36+
}
37+
}
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package org.dreeam.leaf.async;
2+
3+
import net.minecraft.util.Util;
4+
import org.apache.logging.log4j.LogManager;
5+
import org.dreeam.leaf.util.queue.MpmcQueue;
6+
import org.jspecify.annotations.NullMarked;
7+
import org.jspecify.annotations.Nullable;
8+
import org.apache.logging.log4j.Logger;
9+
10+
import java.util.concurrent.Callable;
11+
import java.util.concurrent.Executor;
12+
import java.util.concurrent.Executors;
13+
import java.util.concurrent.FutureTask;
14+
import java.util.concurrent.TimeUnit;
15+
import java.util.concurrent.locks.LockSupport;
16+
17+
@NullMarked
18+
public final class ThreadPool implements Executor {
19+
private static final Logger LOGGER = LogManager.getLogger("Leaf");
20+
private static final long PARK_NANOS = 200_000L; // 0.2ms
21+
22+
private volatile boolean shutdown = false;
23+
private final Thread[] threads;
24+
private final MpmcQueue<Runnable> channel;
25+
private final MpmcQueue<Thread> parkChannel;
26+
27+
public ThreadPool(int numThreads, final int queue, final String prefix, final int priority) {
28+
if (numThreads <= 0) {
29+
throw new IllegalArgumentException();
30+
}
31+
numThreads = numThreads + 1;
32+
this.threads = new Thread[numThreads];
33+
this.channel = new MpmcQueue<>(queue);
34+
this.parkChannel = new MpmcQueue<>(numThreads);
35+
this.threads[0] = Thread.ofPlatform()
36+
.uncaughtExceptionHandler(Util::onThreadException)
37+
.daemon(false)
38+
.priority(priority + 1)
39+
.name(prefix + " Dispatcher")
40+
.start(new Dispatcher(this));
41+
for (int i = 1; i < numThreads; i++) {
42+
threads[i] = Thread.ofPlatform()
43+
.uncaughtExceptionHandler(Util::onThreadException)
44+
.daemon(false)
45+
.priority(priority)
46+
.name(prefix + " Worker - " + i)
47+
.start(new Worker(this));
48+
}
49+
}
50+
51+
@Override
52+
public void execute(Runnable task) {
53+
if (shutdown || !channel.send(task)) {
54+
task.run();
55+
}
56+
}
57+
58+
public boolean isShutdown() {
59+
return shutdown;
60+
}
61+
62+
public <V> FutureTask<V> submit(Runnable task, @Nullable V result) {
63+
final FutureTask<V> t = new FutureTask<>(Executors.callable(task, result));
64+
execute(t);
65+
return t;
66+
}
67+
68+
public <V> FutureTask<V> submit(Callable<V> task) {
69+
final FutureTask<V> t = new FutureTask<>(task);
70+
execute(t);
71+
return t;
72+
}
73+
74+
public void shutdown() {
75+
shutdown = true;
76+
for (final Thread thread : threads) {
77+
LockSupport.unpark(thread);
78+
}
79+
}
80+
81+
public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
82+
final long nanos = unit.toNanos(timeout);
83+
final long startTime = System.nanoTime();
84+
85+
boolean flag = true;
86+
for (final Thread worker : threads) {
87+
if (nanos <= 0L) {
88+
worker.join();
89+
continue;
90+
}
91+
final long remaining = startTime + nanos - System.nanoTime();
92+
if (remaining <= 0L) {
93+
flag = false;
94+
break;
95+
} else {
96+
worker.join(remaining / 1_000_000L, (int) (remaining % 1_000_000L));
97+
if (worker.isAlive()) {
98+
flag = false;
99+
break;
100+
}
101+
}
102+
}
103+
Runnable task;
104+
while ((task = channel.recv()) != null) {
105+
task.run();
106+
}
107+
return flag;
108+
}
109+
110+
public int workerCount() {
111+
return threads.length - 1;
112+
}
113+
114+
private record Worker(ThreadPool executor) implements Runnable {
115+
@Override
116+
public void run() {
117+
final MpmcQueue<Runnable> channel = executor.channel;
118+
final MpmcQueue<Thread> park = executor.parkChannel;
119+
while (true) {
120+
final Runnable task = channel.recv();
121+
if (task != null) {
122+
try {
123+
task.run();
124+
} catch (final Throwable e) {
125+
LOGGER.error("Task {} generated an exception: {}", task, Thread.currentThread().getName(), e);
126+
}
127+
} else if (executor.shutdown) {
128+
break;
129+
} else if (park.send(Thread.currentThread())) {
130+
LockSupport.park();
131+
if (Thread.interrupted()) {
132+
Thread.currentThread().interrupt();
133+
break;
134+
}
135+
} else {
136+
Thread.yield();
137+
}
138+
}
139+
}
140+
}
141+
142+
private record Dispatcher(ThreadPool executor) implements Runnable {
143+
@Override
144+
public void run() {
145+
final int threads = executor.threads.length - 1;
146+
final MpmcQueue<Runnable> channel = executor.channel;
147+
final MpmcQueue<Thread> park = executor.parkChannel;
148+
int backoff = 0;
149+
while (true) {
150+
final int len = channel.length();
151+
if (len != 0 && threads - park.length() < len) {
152+
backoff = 0;
153+
final Thread thread = park.recv();
154+
if (thread != null) {
155+
LockSupport.unpark(thread);
156+
}
157+
} else if (executor.shutdown) {
158+
break;
159+
} else if (backoff < 8) {
160+
backoff++;
161+
Thread.yield();
162+
} else {
163+
LockSupport.parkNanos(PARK_NANOS);
164+
if (Thread.interrupted()) {
165+
Thread.currentThread().interrupt();
166+
break;
167+
}
168+
}
169+
}
170+
Thread left;
171+
while ((left = park.recv()) != null) {
172+
LockSupport.unpark(left);
173+
}
174+
}
175+
}
176+
}

0 commit comments

Comments
 (0)