Skip to content

Commit b6622e0

Browse files
Fix BalancerUtil to pick up all Clusters: port changes from f57e1b6 to 7.X (#2952)
* port changes from f57e1b6 to 7.X * Refactor flow handling in `BalancerUtil` to improve readability and streamline interceptor processing
1 parent 79a231a commit b6622e0

1 file changed

Lines changed: 98 additions & 48 deletions

File tree

core/src/main/java/com/predic8/membrane/core/interceptor/balancer/BalancerUtil.java

Lines changed: 98 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -15,71 +15,121 @@
1515
package com.predic8.membrane.core.interceptor.balancer;
1616

1717
import com.predic8.membrane.core.interceptor.*;
18+
import com.predic8.membrane.core.interceptor.flow.AbstractFlowInterceptor;
19+
import com.predic8.membrane.core.interceptor.flow.IfInterceptor;
20+
import com.predic8.membrane.core.interceptor.flow.choice.AbstractCaseOtherwise;
21+
import com.predic8.membrane.core.interceptor.flow.choice.ChooseInterceptor;
1822
import com.predic8.membrane.core.proxies.*;
19-
import com.predic8.membrane.core.router.*;
23+
import com.predic8.membrane.core.router.Router;
2024

2125
import java.util.*;
26+
import java.util.stream.Stream;
2227

2328
public class BalancerUtil {
2429

25-
public static List<Cluster> collectClusters(Router router) {
26-
ArrayList<Cluster> result = new ArrayList<>();
27-
for (Proxy r : router.getRuleManager().getRules()) {
28-
List<Interceptor> interceptors = r.getFlow();
29-
if (interceptors != null)
30-
for (Interceptor i : interceptors)
31-
if (i instanceof LoadBalancingInterceptor)
32-
result.addAll(((LoadBalancingInterceptor)i).getClusterManager().getClusters());
30+
/**
31+
* The various getFlow() methods only expose the direct child flow of an interceptor.
32+
* Branching interceptors such as if/else and choose/case keep additional flow lists
33+
* outside of getFlow(), so those branches must be added explicitly while walking the flow tree.
34+
*/
35+
private static Stream<List<Interceptor>> allFlows(Router router) {
36+
Set<Interceptor> visited = Collections.newSetFromMap(new IdentityHashMap<>());
37+
return Stream.concat(ruleFlows(router), globalFlows(router))
38+
.flatMap(flow -> allFlows(flow, visited));
39+
}
40+
41+
private static Stream<List<Interceptor>> ruleFlows(Router router) {
42+
return router.getRuleManager()
43+
.getRules()
44+
.stream()
45+
.map(Proxy::getFlow);
46+
}
47+
48+
private static Stream<List<Interceptor>> globalFlows(Router router) {
49+
return Optional.ofNullable(router.getRegistry())
50+
.stream()
51+
.flatMap(registry -> registry.getBean(GlobalInterceptor.class).stream())
52+
.map(GlobalInterceptor::getFlow);
53+
}
54+
55+
private static Stream<List<Interceptor>> allFlows(List<Interceptor> flow, Set<Interceptor> visited) {
56+
if (flow == null) {
57+
return Stream.empty();
3358
}
34-
return result;
59+
return Stream.concat(
60+
Stream.of(flow),
61+
flow.stream().flatMap(interceptor -> childFlows(interceptor, visited))
62+
);
3563
}
3664

37-
public static List<LoadBalancingInterceptor> collectBalancers(Router router) {
38-
ArrayList<LoadBalancingInterceptor> result = new ArrayList<>();
39-
for (Proxy r : router.getRuleManager().getRules()) {
40-
List<Interceptor> interceptors = r.getFlow();
41-
if (interceptors != null)
42-
for (Interceptor i : interceptors)
43-
if (i instanceof LoadBalancingInterceptor)
44-
result.add((LoadBalancingInterceptor)i);
65+
private static Stream<List<Interceptor>> childFlows(Interceptor interceptor, Set<Interceptor> visited) {
66+
if (interceptor == null || !visited.add(interceptor)) {
67+
return Stream.empty();
4568
}
46-
return result;
69+
return directChildFlows(interceptor)
70+
.flatMap(flow -> allFlows(flow, visited));
4771
}
4872

49-
public static Balancer lookupBalancer(Router router, String name) {
50-
for (Proxy r : router.getRuleManager().getRules()) {
51-
List<Interceptor> interceptors = r.getFlow();
52-
if (interceptors != null)
53-
for (Interceptor i : interceptors)
54-
if (i instanceof LoadBalancingInterceptor)
55-
if (((LoadBalancingInterceptor)i).getName().equalsIgnoreCase(name))
56-
return ((LoadBalancingInterceptor) i).getClusterManager();
73+
private static Stream<List<Interceptor>> directChildFlows(Interceptor interceptor) {
74+
if (interceptor instanceof ChooseInterceptor chooseInterceptor) {
75+
return chooseInterceptor.getChoices().stream()
76+
.map(AbstractCaseOtherwise::getFlow);
5777
}
58-
throw new RuntimeException("balancer with name \"" + name + "\" not found.");
78+
if (interceptor instanceof IfInterceptor ifInterceptor) {
79+
return Stream.of(ifInterceptor.getFlow(), ifInterceptor.getElseInterceptor());
80+
}
81+
if (interceptor instanceof AbstractFlowInterceptor flowInterceptor) {
82+
return Stream.of(flowInterceptor.getFlow());
83+
}
84+
return Stream.empty();
85+
}
86+
87+
private static Stream<Balancer> balancerBeans(Router router) {
88+
return Stream.concat(
89+
Optional.ofNullable(router.getRegistry())
90+
.stream()
91+
.flatMap(registry -> registry.getBeans(Balancer.class).stream()),
92+
Optional.ofNullable(router.getBeanFactory())
93+
.map(ctx -> ctx.getBeansOfType(Balancer.class).values().stream())
94+
.orElseGet(Stream::empty)
95+
).distinct();
96+
}
97+
98+
public static List<LoadBalancingInterceptor> collectBalancers(Router router) {
99+
return allFlows(router)
100+
.filter(Objects::nonNull)
101+
.flatMap(List::stream)
102+
.filter(LoadBalancingInterceptor.class::isInstance)
103+
.map(LoadBalancingInterceptor.class::cast)
104+
.distinct()
105+
.toList();
106+
}
107+
108+
public static List<Cluster> collectClusters(Router router) {
109+
return Stream.concat(
110+
collectBalancers(router).stream()
111+
.flatMap(lbi -> lbi.getClusterManager().getClusters().stream()),
112+
balancerBeans(router).flatMap(b -> b.getClusters().stream())
113+
).distinct().toList();
114+
}
115+
116+
public static Balancer lookupBalancer(Router router, String name) {
117+
return collectBalancers(router).stream()
118+
.filter(lbi -> lbi.getName() != null && lbi.getName().equalsIgnoreCase(name))
119+
.map(LoadBalancingInterceptor::getClusterManager)
120+
.findFirst()
121+
.orElseThrow(() -> new RuntimeException("balancer with name %s not found.".formatted(name)));
59122
}
60123

61124
public static LoadBalancingInterceptor lookupBalancerInterceptor(Router router, String name) {
62-
for (Proxy r : router.getRuleManager().getRules()) {
63-
List<Interceptor> interceptors = r.getFlow();
64-
if (interceptors != null)
65-
for (Interceptor i : interceptors)
66-
if (i instanceof LoadBalancingInterceptor)
67-
if (((LoadBalancingInterceptor)i).getName().equalsIgnoreCase(name))
68-
return (LoadBalancingInterceptor) i;
69-
}
70-
throw new RuntimeException("balancer with name \"" + name + "\" not found.");
125+
return collectBalancers(router).stream()
126+
.filter(lbi -> lbi.getName() != null && lbi.getName().equalsIgnoreCase(name))
127+
.findFirst()
128+
.orElseThrow(() -> new RuntimeException("balancer with name %s not found.".formatted(name)));
71129
}
72130

73131
public static boolean hasLoadBalancing(Router router) {
74-
for (Proxy r : router.getRuleManager().getRules()) {
75-
List<Interceptor> interceptors = r.getFlow();
76-
if (interceptors == null)
77-
continue;
78-
for (Interceptor i : interceptors)
79-
if (i instanceof LoadBalancingInterceptor)
80-
return true;
81-
}
82-
return false;
132+
return !collectBalancers(router).isEmpty();
83133
}
84134

85135
public static void up(Router router, String balancerName, String cName, String host, int port) {
@@ -122,8 +172,8 @@ public static List<Session> getSessionsByNode(Router router, String balancerName
122172
return lookupBalancer(router, balancerName).getSessionsByNode(cName, node);
123173
}
124174

125-
public static String getSingleClusterNameOrDefault(Balancer balancer){
126-
if(balancer.getClusters().size() == 1)
175+
public static String getSingleClusterNameOrDefault(Balancer balancer) {
176+
if (balancer.getClusters().size() == 1)
127177
return balancer.getClusters().getFirst().getName();
128178
return Cluster.DEFAULT_NAME;
129179
}

0 commit comments

Comments
 (0)