2020import io .rsocket .core .RSocketConnector ;
2121import io .rsocket .plugins .RequestInterceptor ;
2222import java .util .List ;
23- import java .util .SplittableRandom ;
23+ import java .util .Map ;
2424import java .util .concurrent .ConcurrentHashMap ;
25- import java .util .concurrent .ConcurrentMap ;
2625import java .util .concurrent .ThreadLocalRandom ;
2726import java .util .function .Function ;
2827import reactor .util .annotation .Nullable ;
@@ -37,32 +36,13 @@ public class WeightedLoadbalanceStrategy implements ClientLoadbalanceStrategy {
3736
3837 private static final double EXP_FACTOR = 4.0 ;
3938
40- private static final int EFFORT = 5 ;
41-
42- final int effort ;
43- final SplittableRandom splittableRandom ;
39+ final int maxPairSelectionAttempts ;
4440 final Function <RSocket , WeightedStats > weightedStatsResolver ;
4541
46- public WeightedLoadbalanceStrategy () {
47- this (new DefaultWeightedStatsResolver ());
48- }
49-
50- public WeightedLoadbalanceStrategy (Function <RSocket , WeightedStats > weightedStatsResolver ) {
51- this (EFFORT , weightedStatsResolver );
52- }
53-
54- public WeightedLoadbalanceStrategy (
55- int effort , Function <RSocket , WeightedStats > weightedStatsResolver ) {
56- this (effort , new SplittableRandom (System .nanoTime ()), weightedStatsResolver );
57- }
58-
59- public WeightedLoadbalanceStrategy (
60- int effort ,
61- SplittableRandom splittableRandom ,
62- Function <RSocket , WeightedStats > weightedStatsResolver ) {
63- this .effort = effort ;
64- this .splittableRandom = splittableRandom ;
65- this .weightedStatsResolver = weightedStatsResolver ;
42+ private WeightedLoadbalanceStrategy (
43+ int numberOfAttempts , @ Nullable Function <RSocket , WeightedStats > resolver ) {
44+ this .maxPairSelectionAttempts = numberOfAttempts ;
45+ this .weightedStatsResolver = (resolver != null ? resolver : new DefaultWeightedStatsResolver ());
6646 }
6747
6848 @ Override
@@ -75,7 +55,7 @@ public void initialize(RSocketConnector connector) {
7555
7656 @ Override
7757 public RSocket select (List <RSocket > sockets ) {
78- final int effort = this .effort ;
58+ final int numberOfAttepmts = this .maxPairSelectionAttempts ;
7959 final int size = sockets .size ();
8060
8161 RSocket weightedRSocket ;
@@ -103,7 +83,7 @@ public RSocket select(List<RSocket> sockets) {
10383 RSocket rsc1 = null ;
10484 RSocket rsc2 = null ;
10585
106- for (int i = 0 ; i < effort ; i ++) {
86+ for (int i = 0 ; i < numberOfAttepmts ; i ++) {
10787 int i1 = ThreadLocalRandom .current ().nextInt (size );
10888 int i2 = ThreadLocalRandom .current ().nextInt (size - 1 );
10989
@@ -168,30 +148,83 @@ private static double calculateFactor(final double u, final double l, final doub
168148 return Math .pow (1 + alpha , EXP_FACTOR );
169149 }
170150
171- static class DefaultWeightedStatsResolver implements Function <RSocket , WeightedStats > {
151+ /** Create an instance of {@link WeightedLoadbalanceStrategy} with default settings. */
152+ public static WeightedLoadbalanceStrategy create () {
153+ return new Builder ().build ();
154+ }
155+
156+ /** Return a builder to create a {@link WeightedLoadbalanceStrategy} with. */
157+ public static Builder builder () {
158+ return new Builder ();
159+ }
160+
161+ /** Builder for {@link WeightedLoadbalanceStrategy}. */
162+ public static class Builder {
163+
164+ private int maxPairSelectionAttempts = 5 ;
165+
166+ @ Nullable private Function <RSocket , WeightedStats > weightedStatsResolver ;
167+
168+ private Builder () {}
169+
170+ /**
171+ * How many times to try to randomly select a pair of RSocket connections with non-zero
172+ * availability. This is applicable when there are more than two connections in the pool. If the
173+ * number of attempts is exceeded, the last selected pair is used.
174+ *
175+ * <p>By default this is set to 5.
176+ *
177+ * @param numberOfAttempts the iteration count
178+ */
179+ public Builder maxPairSelectionAttempts (int numberOfAttempts ) {
180+ this .maxPairSelectionAttempts = numberOfAttempts ;
181+ return this ;
182+ }
183+
184+ /**
185+ * Configure how the created {@link WeightedLoadbalanceStrategy} should find the stats for a
186+ * given RSocket.
187+ *
188+ * <p>By default {@code WeightedLoadbalanceStrategy} installs a {@code RequestInterceptor} when
189+ * {@link ClientLoadbalanceStrategy#initialize(RSocketConnector)} is called in order to keep
190+ * track of stats.
191+ *
192+ * @param resolver the function to find the stats for an RSocket
193+ */
194+ public Builder weightedStatsResolver (Function <RSocket , WeightedStats > resolver ) {
195+ this .weightedStatsResolver = resolver ;
196+ return this ;
197+ }
198+
199+ public WeightedLoadbalanceStrategy build () {
200+ return new WeightedLoadbalanceStrategy (
201+ this .maxPairSelectionAttempts , this .weightedStatsResolver );
202+ }
203+ }
204+
205+ private static class DefaultWeightedStatsResolver implements Function <RSocket , WeightedStats > {
172206
173- final ConcurrentMap <RSocket , WeightedStatsRequestInterceptor > rsocketsInterceptors =
174- new ConcurrentHashMap <>();
207+ final Map <RSocket , WeightedStats > statsMap = new ConcurrentHashMap <>();
175208
176209 @ Override
177210 public WeightedStats apply (RSocket rSocket ) {
178- return rsocketsInterceptors .get (rSocket );
211+ return statsMap .get (rSocket );
179212 }
180213
181214 void init (RSocketConnector connector ) {
182215 connector .interceptors (
183- ir ->
184- ir .forRequester (
216+ registry ->
217+ registry .forRequester (
185218 (Function <RSocket , ? extends RequestInterceptor >)
186219 rSocket -> {
187220 final WeightedStatsRequestInterceptor interceptor =
188221 new WeightedStatsRequestInterceptor () {
189222 @ Override
190223 public void dispose () {
191- rsocketsInterceptors .remove (rSocket );
224+ statsMap .remove (rSocket );
192225 }
193226 };
194- rsocketsInterceptors .put (rSocket , interceptor );
227+ statsMap .put (rSocket , interceptor );
195228
196229 return interceptor ;
197230 }));
0 commit comments