Skip to content

Commit 396f369

Browse files
committed
[waterflow] fix: memory leak in ConditionFrom
1 parent 01803a8 commit 396f369

File tree

1 file changed

+0
-30
lines changed
  • framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes

1 file changed

+0
-30
lines changed

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,9 @@
1111
import modelengine.fit.waterflow.domain.context.repo.flowcontext.FlowContextRepo;
1212
import modelengine.fit.waterflow.domain.context.repo.flowlock.FlowLocks;
1313
import modelengine.fit.waterflow.domain.enums.FlowNodeType;
14-
import modelengine.fit.waterflow.domain.stream.reactive.Subscription;
15-
import modelengine.fit.waterflow.domain.utils.IdGenerator;
1614
import modelengine.fit.waterflow.domain.utils.UUIDUtil;
1715

18-
import java.util.LinkedHashMap;
1916
import java.util.List;
20-
import java.util.Map;
21-
import java.util.Optional;
22-
import java.util.concurrent.ConcurrentHashMap;
2317
import java.util.stream.Collectors;
2418

2519
/**
@@ -77,44 +71,20 @@ private static <I> From<I> initFrom(String streamId, FlowContextRepo repo, FlowC
7771
}
7872

7973
private static class ConditionFrom<I> extends From<I> {
80-
private final Map<String, Map<String, Subscription<I>>> sessionSubscription = new ConcurrentHashMap<>();
81-
8274
public ConditionFrom(String streamId, FlowContextRepo repo, FlowContextMessenger messenger, FlowLocks locks) {
8375
super(streamId, repo, messenger, locks);
8476
}
8577

8678
@Override
8779
public void offer(List<FlowContext<I>> contexts) {
88-
this.offerUserContexts(contexts);
89-
}
90-
91-
private void offerUserContexts(List<FlowContext<I>> contexts) {
9280
this.getSubscriptions().forEach(subscription -> {
9381
List<FlowContext<I>> matched = contexts.stream()
9482
.filter(context -> subscription.getWhether().is(context.getData()))
95-
.peek(context -> {
96-
this.record(subscription, context);
97-
})
9883
.collect(Collectors.toList());
9984
matched.forEach(contexts::remove);
10085
subscription.cache(matched);
10186
});
10287
}
10388

104-
private void record(Subscription<I> subscription, FlowContext<I> context) {
105-
String sessionId = getSessionId(context);
106-
if (sessionId == null) {
107-
return;
108-
}
109-
this.sessionSubscription.putIfAbsent(sessionId, new LinkedHashMap<>());
110-
Map<String, Subscription<I>> subscriptionMap = this.sessionSubscription.get(sessionId);
111-
if (!subscriptionMap.containsKey(subscription.getId())) {
112-
subscriptionMap.put(subscription.getId(), subscription);
113-
}
114-
}
115-
116-
private String getSessionId(FlowContext<I> context) {
117-
return Optional.ofNullable(context.getSession()).map(IdGenerator::getId).orElse(null);
118-
}
11989
}
12090
}

0 commit comments

Comments
 (0)