From 4f2266cf8435ebdec0520362d1e799b16e1a5713 Mon Sep 17 00:00:00 2001
From: sinsy <550569627@qq.com>
Date: Fri, 26 Jan 2024 14:54:52 +0800
Subject: [PATCH 1/3] feat: peak ewma lb
---
.../spi/PeakEWMALoadBalancer.java | 145 ++++++++++++++++++
1 file changed, 145 insertions(+)
create mode 100644 shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java
diff --git a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java
new file mode 100644
index 000000000000..18e9c01c4caf
--- /dev/null
+++ b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.loadbalancer.spi;
+
+import org.apache.shenyu.loadbalancer.entity.Upstream;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ *
+ * PeakEwmaLoadBalance is designed to converge quickly when encountering slow endpoints.
+ * It is quick to react to latency spikes recovering only cautiously.Peak EWMA takes
+ * history into account,so that slow behavior is penalized relative to the
+ * supplied `decayTime`.
+ * if there are multiple invokers and the same cost,then randomly called,which doesn't care
+ * about weight.
+ *
+ * Inspiration drawn from:
+ * https://github.com/twitter/finagle/blob/1bc837c4feafc0096e43c0e98516a8e1c50c4421
+ * /finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala
+ *
+ * https://github.com/apache/dubbo-spi-extensions/blob/efd18a63468f817a7581fea44e9e2e3f35d9c9ba
+ * /dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/main/java/org/apache
+ * /dubbo/rpc/cluster/loadbalance/PeakEwmaLoadBalance.java#L46
+ */
+public class PeakEWMALoadBalancer extends AbstractLoadBalancer {
+
+ private static final double PENALTY = Long.MAX_VALUE >> 16;
+
+ private static final double ZERO_COST = 1E-6;
+
+ private static double decayTime = 10000;
+
+ @Override
+ protected Upstream doSelect(List upstreamList, String ip) {
+ double minResponse = Double.MAX_VALUE;
+
+ List selectInvokerIndexList = new ArrayList<>(upstreamList.size());
+
+ for (int i = 0; i < upstreamList.size(); i++) {
+ Metric metric = new Metric(upstreamList.get(i));
+ double estimateResponse = metric.getCost();
+
+ if (estimateResponse < minResponse) {
+ selectInvokerIndexList.clear();
+ selectInvokerIndexList.add(i);
+ minResponse = estimateResponse;
+ } else if (estimateResponse == minResponse) {
+ selectInvokerIndexList.add(i);
+ }
+ }
+
+ return upstreamList.get(selectInvokerIndexList.get(ThreadLocalRandom.current().nextInt(selectInvokerIndexList.size())));
+ }
+
+ protected static class Metric {
+
+ /**
+ * last timestamp in Millis we observed an runningTime
+ */
+ private volatile long lastUpdateTime;
+
+ /**
+ * ewma of rtt, sensitive to peaks.
+ */
+ private volatile double cost;
+
+ private Upstream upstream;
+
+ private long invokeOffset;
+
+ private long invokeElapsedOffset;
+
+ //lock for get and set cost
+ ReentrantLock ewmaLock = new ReentrantLock();
+
+ public Metric(Upstream upstream) {
+ this.upstream = upstream;
+ this.lastUpdateTime = System.currentTimeMillis();
+ this.cost = 0.0;
+ this.invokeOffset = 0;
+ this.invokeElapsedOffset = 0;
+ }
+
+ private void observe() {
+ double rtt = 0;
+ long succeed = this.upstream.getSucceeded().get() - this.invokeOffset;
+ if (succeed != 0) {
+ rtt = (this.upstream.getSucceededElapsed().get() * 1.0 - this.invokeElapsedOffset) / succeed;
+ }
+
+ final long currentTime = System.currentTimeMillis();
+ long td = Math.max(currentTime - lastUpdateTime, 0);
+ double w = Math.exp(-td / decayTime);
+ if (rtt > cost) {
+ cost = rtt;
+ } else {
+ cost = cost * w + rtt * (1.0 - w);
+ }
+
+ lastUpdateTime = currentTime;
+
+// invokeOffset = upstream.getTotal();
+// invokeElapsedOffset = upstream.getTotalElapsed();
+
+ }
+
+ private double getCost() {
+ ewmaLock.lock();
+ observe();
+ int active = 0;
+ if (upstream.isHealthy()) {
+ active = 1;
+ }
+
+ ewmaLock.unlock();
+
+ double costTemp = cost;
+
+ //If we don't have any latency history, we penalize the host on the first probe.
+ return (costTemp < ZERO_COST && active != 0) ? PENALTY + active : costTemp * (active + 1);
+ }
+
+ }
+
+
+}
From d819f27c2e43a16e150816e530d3cae9b80d0e58 Mon Sep 17 00:00:00 2001
From: sinsy <550569627@qq.com>
Date: Wed, 31 Jan 2024 10:51:35 +0800
Subject: [PATCH 2/3] add spi
---
.../shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java | 6 ++++--
.../shenyu/org.apache.shenyu.loadbalancer.spi.LoadBalancer | 3 ++-
2 files changed, 6 insertions(+), 3 deletions(-)
diff --git a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java
index 18e9c01c4caf..e4f580a79fce 100644
--- a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java
+++ b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java
@@ -18,6 +18,7 @@
package org.apache.shenyu.loadbalancer.spi;
import org.apache.shenyu.loadbalancer.entity.Upstream;
+import org.apache.shenyu.spi.Join;
import java.util.ArrayList;
import java.util.List;
@@ -41,13 +42,14 @@
* /dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/main/java/org/apache
* /dubbo/rpc/cluster/loadbalance/PeakEwmaLoadBalance.java#L46
*/
+@Join
public class PeakEWMALoadBalancer extends AbstractLoadBalancer {
private static final double PENALTY = Long.MAX_VALUE >> 16;
private static final double ZERO_COST = 1E-6;
- private static double decayTime = 10000;
+ private static final double DECAY_TIME = 600;
@Override
protected Upstream doSelect(List upstreamList, String ip) {
@@ -109,7 +111,7 @@ private void observe() {
final long currentTime = System.currentTimeMillis();
long td = Math.max(currentTime - lastUpdateTime, 0);
- double w = Math.exp(-td / decayTime);
+ double w = Math.exp( -td / DECAY_TIME);
if (rtt > cost) {
cost = rtt;
} else {
diff --git a/shenyu-loadbalancer/src/main/resources/META-INF/shenyu/org.apache.shenyu.loadbalancer.spi.LoadBalancer b/shenyu-loadbalancer/src/main/resources/META-INF/shenyu/org.apache.shenyu.loadbalancer.spi.LoadBalancer
index 109f1f482b1c..f0a096ccf51a 100644
--- a/shenyu-loadbalancer/src/main/resources/META-INF/shenyu/org.apache.shenyu.loadbalancer.spi.LoadBalancer
+++ b/shenyu-loadbalancer/src/main/resources/META-INF/shenyu/org.apache.shenyu.loadbalancer.spi.LoadBalancer
@@ -18,4 +18,5 @@ roundRobin=org.apache.shenyu.loadbalancer.spi.RoundRobinLoadBalancer
hash=org.apache.shenyu.loadbalancer.spi.HashLoadBalancer
leastActive=org.apache.shenyu.loadbalancer.spi.LeastActiveLoadBalance
p2c=org.apache.shenyu.loadbalancer.spi.P2cLoadBalancer
-shortestResponse=org.apache.shenyu.loadbalancer.spi.ShortestResponseLoadBalancer
\ No newline at end of file
+shortestResponse=org.apache.shenyu.loadbalancer.spi.ShortestResponseLoadBalancer
+peakEWMA=org.apache.shenyu.loadbalancer.spi.PeakEWMALoadBalancer
\ No newline at end of file
From 93e619c5e027016722016ce7276732f61532884d Mon Sep 17 00:00:00 2001
From: sinsy <550569627@qq.com>
Date: Wed, 31 Jan 2024 15:48:38 +0800
Subject: [PATCH 3/3] change get metric way
---
.../spi/PeakEWMALoadBalancer.java | 30 +++++++++----------
1 file changed, 14 insertions(+), 16 deletions(-)
diff --git a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java
index e4f580a79fce..73c2128574ad 100644
--- a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java
+++ b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/spi/PeakEWMALoadBalancer.java
@@ -22,6 +22,8 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.ReentrantLock;
@@ -51,14 +53,21 @@ public class PeakEWMALoadBalancer extends AbstractLoadBalancer {
private static final double DECAY_TIME = 600;
+ private Map upstreamMetricMap = new ConcurrentHashMap<>();
+
@Override
protected Upstream doSelect(List upstreamList, String ip) {
double minResponse = Double.MAX_VALUE;
List selectInvokerIndexList = new ArrayList<>(upstreamList.size());
-
+ Metric metric;
for (int i = 0; i < upstreamList.size(); i++) {
- Metric metric = new Metric(upstreamList.get(i));
+ if (upstreamMetricMap.containsKey(upstreamList.get(i))) {
+ metric = upstreamMetricMap.get(upstreamList.get(i));
+ } else {
+ metric = new Metric(upstreamList.get(i));
+ upstreamMetricMap.put(upstreamList.get(i), metric);
+ }
double estimateResponse = metric.getCost();
if (estimateResponse < minResponse) {
@@ -87,10 +96,6 @@ protected static class Metric {
private Upstream upstream;
- private long invokeOffset;
-
- private long invokeElapsedOffset;
-
//lock for get and set cost
ReentrantLock ewmaLock = new ReentrantLock();
@@ -98,20 +103,16 @@ public Metric(Upstream upstream) {
this.upstream = upstream;
this.lastUpdateTime = System.currentTimeMillis();
this.cost = 0.0;
- this.invokeOffset = 0;
- this.invokeElapsedOffset = 0;
}
private void observe() {
double rtt = 0;
- long succeed = this.upstream.getSucceeded().get() - this.invokeOffset;
- if (succeed != 0) {
- rtt = (this.upstream.getSucceededElapsed().get() * 1.0 - this.invokeElapsedOffset) / succeed;
- }
+
+ rtt = Math.max(this.upstream.getResponseStamp() - this.upstream.getLastPicked(), rtt);
final long currentTime = System.currentTimeMillis();
long td = Math.max(currentTime - lastUpdateTime, 0);
- double w = Math.exp( -td / DECAY_TIME);
+ double w = Math.exp(-td / DECAY_TIME);
if (rtt > cost) {
cost = rtt;
} else {
@@ -120,8 +121,6 @@ private void observe() {
lastUpdateTime = currentTime;
-// invokeOffset = upstream.getTotal();
-// invokeElapsedOffset = upstream.getTotalElapsed();
}
@@ -143,5 +142,4 @@ private double getCost() {
}
-
}