-
Notifications
You must be signed in to change notification settings - Fork 345
Expand file tree
/
Copy pathCyclicThreadPoolDeadLockDemo.java
More file actions
104 lines (99 loc) · 3.99 KB
/
CyclicThreadPoolDeadLockDemo.java
File metadata and controls
104 lines (99 loc) · 3.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package fucking.concurrency.demo;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.CompletableFuture.supplyAsync;
/**
* @author Eric Lin (linqinghua4 at gmail dot com)
*/
public class CyclicThreadPoolDeadLockDemo {
public static void main(String[] args) throws InterruptedException {
if (args.length > 0 && "good".equals(args[0])) {
goodCase();
} else {
badCase();
}
}
static void badCase() throws InterruptedException {
int poolSize = 16;
ThreadPoolExecutor pool1 = new ThreadPoolExecutor(poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
ThreadPoolExecutor pool2 = new ThreadPoolExecutor(poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
List<Future<Integer>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
int finalI = i;
Future<Integer> future = pool1.submit(() -> {
System.out.println("step1, i = " + finalI);
return 1 + getUnchecked(pool2.submit(() -> {
System.out.println("step2, i = " + finalI);
return 2 + getUnchecked(pool1.submit(() -> {
System.out.println("step3, i = " + finalI);
return 3;
}));
}));
});
futures.add(future);
}
// 无法计算,死锁
int result = futures.stream()
.mapToInt(CyclicThreadPoolDeadLockDemo::getUnchecked)
.sum();
System.out.println("result = " + result);
// 无法关闭
pool1.awaitTermination(20, TimeUnit.SECONDS);
pool2.awaitTermination(20, TimeUnit.SECONDS);
}
static void goodCase() throws InterruptedException {
int poolSize = 16;
ThreadPoolExecutor pool1 = new ThreadPoolExecutor(poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
ThreadPoolExecutor pool2 = new ThreadPoolExecutor(poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
List<Future<Integer>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
int finalI = i;
CompletableFuture<Integer> cf1 = supplyAsync(() -> {
System.out.println("step1, i = " + finalI);
return 1;
}, pool1);
CompletableFuture<Integer> cf2 = supplyAsync(() -> {
System.out.println("step2, i = " + finalI);
return 2;
}, pool2);
CompletableFuture<Integer> cf3 = supplyAsync(() -> {
System.out.println("step3, i = " + finalI);
return 3;
}, pool1);
Future<Integer> future =
cf1.thenComposeAsync(x ->
cf2.thenComposeAsync(y ->
cf3.thenApply(z ->
x + y + z), pool2), pool1);
futures.add(future);
}
System.out.println("size1 = " + pool1.getQueue().size());
System.out.println("size2 = " + pool2.getQueue().size());
int result = futures.stream()
.mapToInt(CyclicThreadPoolDeadLockDemo::getUnchecked)
.sum();
System.out.println("result = " + result);
pool1.awaitTermination(20, TimeUnit.SECONDS);
pool2.awaitTermination(20, TimeUnit.SECONDS);
}
static <T> T getUnchecked(Future<T> future) {
try {
return future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}