Skip to content

Commit 137ff23

Browse files
committed
[waterflow] fix: clean flow locks
1 parent d51d20a commit 137ff23

File tree

2 files changed

+84
-21
lines changed

2 files changed

+84
-21
lines changed

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowlock/FlowLocks.java

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,7 @@
99
import modelengine.fit.waterflow.domain.common.Constants;
1010
import modelengine.fitframework.util.StringUtils;
1111

12-
import java.util.Map;
13-
import java.util.Optional;
14-
import java.util.concurrent.ConcurrentHashMap;
1512
import java.util.concurrent.locks.Lock;
16-
import java.util.concurrent.locks.ReentrantLock;
1713

1814
/**
1915
* 流程实例的锁接口
@@ -22,10 +18,6 @@
2218
* @since 1.0
2319
*/
2420
public interface FlowLocks {
25-
/**
26-
* 本地锁全局静态对象
27-
*/
28-
Map<String, Lock> locks = new ConcurrentHashMap<>();
2921

3022
/**
3123
* 节点分布式锁key前缀
@@ -38,9 +30,7 @@ public interface FlowLocks {
3830
* @param key 获取本地锁的key值,一般是流程版本的streamID
3931
* @return {@link Lock} 锁对象
4032
*/
41-
default Lock getLocalLock(String key) {
42-
return Optional.ofNullable(locks.putIfAbsent(key, new ReentrantLock())).orElseGet(() -> locks.get(key));
43-
}
33+
Lock getLocalLock(String key);
4434

4535
/**
4636
* 获取分布式锁
@@ -50,16 +40,6 @@ default Lock getLocalLock(String key) {
5040
*/
5141
Lock getDistributeLock(String key);
5242

53-
/**
54-
* 删除本地锁
55-
* TODO xiangyu 删除流程定义的时候需要删除该定义的本地锁资源
56-
*
57-
* @param key 删除本地锁的key值,一般是流程版本的streamID
58-
*/
59-
default void removeLocalLock(String key) {
60-
locks.remove(key);
61-
}
62-
6343
/**
6444
* 获取节点分布式锁key值
6545
* 获取分布式锁的key值,一般是prefix-streamID-nodeID-type

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowlock/FlowLocksMemo.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,13 @@
66

77
package modelengine.fit.waterflow.domain.context.repo.flowlock;
88

9+
import java.util.Map;
10+
import java.util.concurrent.ConcurrentHashMap;
11+
import java.util.concurrent.TimeUnit;
12+
import java.util.concurrent.atomic.AtomicInteger;
13+
import java.util.concurrent.locks.Condition;
914
import java.util.concurrent.locks.Lock;
15+
import java.util.concurrent.locks.ReentrantLock;
1016

1117
/**
1218
* 流程锁,内存版本的实现
@@ -15,6 +21,18 @@
1521
* @since 1.0
1622
*/
1723
public class FlowLocksMemo implements FlowLocks {
24+
private final Map<String, MemLockWrapper> locks = new ConcurrentHashMap<>();
25+
26+
@Override
27+
public Lock getLocalLock(String key) {
28+
return locks.compute(key, (__, value) -> {
29+
if (value == null) {
30+
return new MemLockWrapper(key, new ReentrantLock(), this);
31+
}
32+
return value;
33+
});
34+
}
35+
1836
/**
1937
* 获取分布式锁
2038
* 获取分布式锁的key值,一般是prefix-streamID-nodeID-suffixes
@@ -27,4 +45,69 @@ public class FlowLocksMemo implements FlowLocks {
2745
public Lock getDistributeLock(String key) {
2846
return getLocalLock(key);
2947
}
48+
49+
private void tryCleanLocalLock(String key) {
50+
this.locks.compute(key, (__, value) -> {
51+
if (value == null) {
52+
return null;
53+
}
54+
if (value.getRefCount() == 0) {
55+
return null;
56+
}
57+
return value;
58+
});
59+
}
60+
61+
private static class MemLockWrapper implements Lock {
62+
private final String key;
63+
private final AtomicInteger refCount = new AtomicInteger(1);
64+
private final ReentrantLock target;
65+
private final FlowLocksMemo locksMemo;
66+
67+
private MemLockWrapper(String key, ReentrantLock target, FlowLocksMemo locksMemo) {
68+
this.key = key;
69+
this.target = target;
70+
this.locksMemo = locksMemo;
71+
}
72+
73+
@Override
74+
public void lock() {
75+
this.target.lock();
76+
}
77+
78+
@Override
79+
public void lockInterruptibly() throws InterruptedException {
80+
this.target.lockInterruptibly();
81+
}
82+
83+
@Override
84+
public boolean tryLock() {
85+
return this.target.tryLock();
86+
}
87+
88+
@Override
89+
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
90+
return this.target.tryLock(time, unit);
91+
}
92+
93+
@Override
94+
public void unlock() {
95+
this.target.unlock();
96+
this.refCount.decrementAndGet();
97+
this.locksMemo.tryCleanLocalLock(this.key);
98+
}
99+
100+
@Override
101+
public Condition newCondition() {
102+
return this.target.newCondition();
103+
}
104+
105+
private void addRef() {
106+
this.refCount.incrementAndGet();
107+
}
108+
109+
private int getRefCount() {
110+
return this.refCount.get();
111+
}
112+
}
30113
}

0 commit comments

Comments
 (0)