Skip to content

Commit fa2cfd5

Browse files
committed
Initial commit
0 parents  commit fa2cfd5

4 files changed

Lines changed: 373 additions & 0 deletions

File tree

.gitignore

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Gradle
2+
.gradle/
3+
build/
4+
5+
# IntelliJ
6+
.idea/
7+
*.iml
8+
out/
9+
/target
10+
11+
# Compiled
12+
*.class
13+
*.jar
14+
15+
# OS files
16+
.DS_Store
17+
Thumbs.db

pom.xml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>refined</groupId>
8+
<artifactId>DynamicThread</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
11+
<properties>
12+
<maven.compiler.source>26</maven.compiler.source>
13+
<maven.compiler.target>26</maven.compiler.target>
14+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15+
</properties>
16+
<repositories>
17+
<repository>
18+
<id>jitpack.io</id>
19+
<url>https://jitpack.io</url>
20+
</repository>
21+
</repositories>
22+
</project>
Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
package org.DynamicThread;
2+
3+
import java.util.function.Consumer;
4+
import java.util.function.Function;
5+
import java.util.function.Supplier;
6+
7+
@SuppressWarnings({"unused", "unchecked"})
8+
public final class AsyncTask<T> {
9+
10+
public static AsyncTask<Void> of(Runnable runnable) {
11+
return new AsyncTask<>(() -> {
12+
runnable.run();
13+
return null;
14+
}, null);
15+
}
16+
17+
public static <T> AsyncTask<T> of(Supplier<T> supplier) {
18+
return new AsyncTask<>(supplier, null);
19+
}
20+
21+
private static final class Context {
22+
volatile boolean cancelled;
23+
volatile boolean timedOut;
24+
}
25+
26+
private final Supplier<T> supplier;
27+
private final Context ctx;
28+
private final AsyncTask<?> root;
29+
30+
private AsyncTask<?> tail;
31+
32+
private Runnable onStart;
33+
private Consumer<T> onComplete;
34+
private Runnable onTimeout;
35+
private Consumer<Throwable> onError;
36+
37+
private volatile boolean completed;
38+
private volatile boolean started;
39+
private volatile long timeoutMillis = 60000L;
40+
private volatile Thread taskThread;
41+
private volatile T result;
42+
43+
private AsyncTask(Supplier<T> supplier, AsyncTask<?> root) {
44+
this.supplier = supplier;
45+
this.ctx = (root == null) ? new Context() : root.ctx;
46+
this.root = (root == null ? this : root);
47+
this.tail = this;
48+
}
49+
50+
public AsyncTask<Void> after(Runnable... runnables) {
51+
return after(() -> {
52+
for (Runnable r : runnables) {
53+
r.run();
54+
}
55+
return null;
56+
});
57+
}
58+
59+
public <N> AsyncTask<N> after(Supplier<N> nextSupplier) {
60+
AsyncTask<N> next = new AsyncTask<>(nextSupplier, root);
61+
62+
synchronized (this) {
63+
if (completed) {
64+
if (!ctx.cancelled && !ctx.timedOut) {
65+
next.startInternal();
66+
}
67+
} else {
68+
Consumer<T> previous = onComplete;
69+
onComplete = res -> {
70+
if (previous != null) {
71+
previous.accept(res);
72+
}
73+
if (!ctx.cancelled && !ctx.timedOut) {
74+
next.startInternal();
75+
}
76+
};
77+
}
78+
}
79+
80+
root.tail = next;
81+
return next;
82+
}
83+
public <N> AsyncTask<N> map(Function<T, N> fn) {
84+
AsyncTask<N> next = new AsyncTask<>(() -> fn.apply(result), root);
85+
86+
synchronized (this) {
87+
if (completed) {
88+
if (!ctx.cancelled && !ctx.timedOut) {
89+
next.startInternal();
90+
}
91+
} else {
92+
Consumer<T> previous = onComplete;
93+
onComplete = res -> {
94+
if (previous != null) {
95+
previous.accept(res);
96+
}
97+
if (!ctx.cancelled && !ctx.timedOut) {
98+
next.startInternal();
99+
}
100+
};
101+
}
102+
}
103+
104+
root.tail = next;
105+
return next;
106+
}
107+
108+
public AsyncTask<T> start() {
109+
synchronized (root) {
110+
if (!root.started) {
111+
root.started = true;
112+
root.startInternal();
113+
}
114+
}
115+
return (AsyncTask<T>) root.tail;
116+
}
117+
118+
void startInternal() {
119+
TaskManager.submit(() -> {
120+
if (onStart != null) {
121+
onStart.run();
122+
}
123+
124+
Thread localThread = new Thread(() -> {
125+
try {
126+
if (!ctx.cancelled && !ctx.timedOut) {
127+
result = supplier.get();
128+
}
129+
} catch (Throwable t) {
130+
if (onError != null) onError.accept(t);
131+
}
132+
});
133+
134+
synchronized (this) {
135+
taskThread = localThread;
136+
}
137+
138+
localThread.start();
139+
140+
boolean finished = false;
141+
long timeout = timeoutMillis;
142+
143+
try {
144+
if (timeout > 0) {
145+
long end = System.currentTimeMillis() + timeout;
146+
while (!finished && !ctx.cancelled) {
147+
long remaining = end - System.currentTimeMillis();
148+
if (remaining <= 0) {
149+
break;
150+
}
151+
localThread.join(remaining);
152+
if (!localThread.isAlive()) {
153+
finished = true;
154+
}
155+
}
156+
}
157+
else {
158+
localThread.join();
159+
finished = true;
160+
}
161+
} catch (Throwable error) {
162+
if (onError != null) onError.accept(error);
163+
}
164+
165+
if (!finished && !ctx.cancelled) {
166+
ctx.timedOut = true;
167+
localThread.interrupt();
168+
if (onTimeout != null) {
169+
onTimeout.run();
170+
}
171+
}
172+
173+
if (!ctx.cancelled && !ctx.timedOut) {
174+
Consumer<T> completeHandler = onComplete;
175+
if (completeHandler != null) {
176+
completeHandler.accept(result);
177+
}
178+
}
179+
180+
synchronized (this) {
181+
completed = true;
182+
this.notifyAll();
183+
}
184+
});
185+
}
186+
187+
public synchronized T join() {
188+
while (!completed) {
189+
try {
190+
this.wait();
191+
} catch (InterruptedException error) {
192+
if (onError != null) onError.accept(error);
193+
}
194+
}
195+
return result;
196+
}
197+
198+
public synchronized T join(long timeoutMillis) {
199+
if (!completed && timeoutMillis > 0) {
200+
long end = System.currentTimeMillis() + timeoutMillis;
201+
while (!completed) {
202+
long remaining = end - System.currentTimeMillis();
203+
if (remaining <= 0) {
204+
break;
205+
}
206+
try {
207+
this.wait(remaining);
208+
} catch (Throwable error) {
209+
if (onError != null) onError.accept(error);
210+
}
211+
}
212+
} else {
213+
while (!completed) {
214+
try {
215+
this.wait();
216+
} catch (Throwable error) {
217+
if (onError != null) onError.accept(error);
218+
}
219+
}
220+
}
221+
return result;
222+
}
223+
224+
public boolean isCompleted() {
225+
return completed;
226+
}
227+
228+
public boolean isCancelled() {
229+
return ctx.cancelled;
230+
}
231+
232+
public boolean isTimedOut() {
233+
return ctx.timedOut;
234+
}
235+
236+
public AsyncTask<T> timeout(long timeoutMillis) {
237+
this.timeoutMillis = timeoutMillis;
238+
return this;
239+
}
240+
241+
public void cancel() {
242+
ctx.cancelled = true;
243+
synchronized (this) {
244+
if (completed) {
245+
return;
246+
}
247+
Thread t = taskThread;
248+
if (t != null) {
249+
t.interrupt();
250+
}
251+
completed = true;
252+
this.notifyAll();
253+
}
254+
}
255+
256+
public AsyncTask<T> onStart(Runnable runnable) {
257+
this.onStart = runnable;
258+
return this;
259+
}
260+
261+
public AsyncTask<T> onComplete(Consumer<T> consumer) {
262+
@SuppressWarnings("unchecked")
263+
AsyncTask<T> target = (AsyncTask<T>) (root.tail != null ? root.tail : this);
264+
265+
Consumer<T> previous = target.onComplete;
266+
target.onComplete = res -> {
267+
if (previous != null) {
268+
previous.accept(res);
269+
}
270+
consumer.accept(res);
271+
};
272+
return target;
273+
}
274+
275+
public AsyncTask<T> onTimeout(Runnable runnable) {
276+
this.onTimeout = runnable;
277+
return this;
278+
}
279+
280+
public AsyncTask<T> onError(Consumer<Throwable> consumer) {
281+
this.onError = consumer;
282+
return this;
283+
}
284+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package org.DynamicThread;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.concurrent.BlockingQueue;
6+
import java.util.concurrent.LinkedBlockingQueue;
7+
8+
@SuppressWarnings({"ResultOfMethodCallIgnored", "unused"})
9+
public final class TaskManager {
10+
public static void disable() {
11+
running = false;
12+
for (Thread t : THREADS) {
13+
t.interrupt();
14+
}
15+
THREADS.clear();
16+
QUEUE.clear();
17+
}
18+
private static final int WORKERS;
19+
private static final BlockingQueue<Runnable> QUEUE;
20+
private static final List<Thread> THREADS;
21+
22+
private static volatile boolean running = true;
23+
24+
static {
25+
WORKERS = Runtime.getRuntime().availableProcessors();
26+
QUEUE = new LinkedBlockingQueue<>(WORKERS);
27+
THREADS = new ArrayList<>(WORKERS);
28+
for (int i = 0; i < WORKERS; i++) {
29+
Thread t = new Thread(() -> {
30+
while (running) {
31+
try {
32+
Runnable r = QUEUE.take();
33+
r.run();
34+
} catch (InterruptedException ignored) {}
35+
}
36+
});
37+
t.setDaemon(true);
38+
t.setName("DynamicThread-" + i);
39+
t.start();
40+
THREADS.add(t);
41+
}
42+
}
43+
44+
private TaskManager() {}
45+
46+
static void submit(Runnable r) {
47+
if (running) QUEUE.offer(r);
48+
}
49+
50+
}

0 commit comments

Comments
 (0)